You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/06/27 21:52:45 UTC
[ignite-3] branch main updated: IGNITE-14957 Introduced the
NetworkAddress class. Fixes #185
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3fdeaf4 IGNITE-14957 Introduced the NetworkAddress class. Fixes #185
3fdeaf4 is described below
commit 3fdeaf416226a9463adc12890bbfefdf7781d5e2
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Mon Jun 28 00:50:57 2021 +0300
IGNITE-14957 Introduced the NetworkAddress class. Fixes #185
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
examples/pom-standalone.xml | 4 +-
examples/pom.xml | 2 +-
.../affinity/RendezvousAffinityFunctionTest.java | 18 +--
.../client/ITMetaStorageServiceTest.java | 19 ++-
modules/network-api/pom.xml | 6 +
.../ignite/network/ClusterLocalConfiguration.java | 6 +-
.../org/apache/ignite/network/ClusterNode.java | 49 ++------
.../apache/ignite/network/MessagingService.java | 8 +-
.../org/apache/ignite/network/NetworkAddress.java | 118 +++++++++++++++++++
.../ignite/network/NetworkMessageHandler.java | 4 +-
.../org/apache/ignite/network/TopologyService.java | 2 +-
.../apache/ignite/network/NetworkAddressTest.java | 80 +++++++++++++
.../network/scalecube/ITNodeRestartsTest.java | 15 ++-
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 14 +--
.../scalecube/ScaleCubeClusterServiceFactory.java | 22 ++--
.../ScaleCubeDirectMarshallerTransport.java | 3 +-
.../scalecube/ScaleCubeMessagingService.java | 37 +++---
.../scalecube/ScaleCubeTopologyService.java | 9 +-
.../java/org/apache/ignite/raft/client/Peer.java | 29 ++---
.../raft/client/service/RaftGroupServiceTest.java | 120 +++++++++----------
.../ignite/raft/jraft/core/ITCliServiceTest.java | 6 +-
.../apache/ignite/raft/jraft/core/ITNodeTest.java | 12 +-
.../raft/server/ITJRaftCounterServerTest.java | 22 ++--
.../raft/server/ITSimpleCounterServerTest.java | 9 +-
.../ignite/raft/server/RaftServerAbstractTest.java | 3 +-
.../internal/raft/server/impl/JRaftServerImpl.java | 53 ++++-----
.../internal/raft/server/impl/RaftServerImpl.java | 9 +-
.../org/apache/ignite/raft/jraft/JRaftUtils.java | 17 ++-
.../apache/ignite/raft/jraft/entity/PeerId.java | 7 +-
.../apache/ignite/raft/jraft/rpc/RpcContext.java | 5 +-
.../raft/jraft/rpc/impl/IgniteRpcClient.java | 40 ++++---
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 5 +-
.../rpc/impl/client/ActionRequestProcessor.java | 35 +++---
.../rpc/impl/client/GetLeaderRequestProcessor.java | 10 +-
.../rpc/impl/client/SnapshotRequestProcessor.java | 18 ++-
.../apache/ignite/raft/jraft/core/TestCluster.java | 13 +-
.../ignite/raft/jraft/rpc/IgniteRpcTest.java | 12 +-
.../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 10 +-
.../ignite/raft/jraft/test/MockAsyncContext.java | 9 +-
.../apache/ignite/internal/app/IgnitionImpl.java | 10 +-
modules/table/pom.xml | 6 +
.../ignite/distributed/ITDistributedTableTest.java | 22 ++--
.../ignite/internal/table/TableManagerTest.java | 131 +++++++++------------
43 files changed, 614 insertions(+), 415 deletions(-)
diff --git a/examples/pom-standalone.xml b/examples/pom-standalone.xml
index 23da513..960d980 100644
--- a/examples/pom-standalone.xml
+++ b/examples/pom-standalone.xml
@@ -24,7 +24,7 @@
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-examples</artifactId>
- <version>3.0.0-alpha2</version>
+ <version>3.0.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
@@ -35,7 +35,7 @@
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-runner</artifactId>
- <version>3.0.0-alpha2</version>
+ <version>3.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
diff --git a/examples/pom.xml b/examples/pom.xml
index fd5fee8..0bb9f56 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -30,7 +30,7 @@
</parent>
<artifactId>ignite-examples</artifactId>
- <version>3.0.0-alpha2</version>
+ <version>3.0.0-SNAPSHOT</version>
<dependencies>
<dependency>
diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
index e51fd40..206c09f 100644
--- a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
+++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
@@ -24,9 +24,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
@@ -53,7 +56,7 @@ public class RendezvousAffinityFunctionTest {
int replicas = 4;
- ArrayList<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
+ List<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
assertTrue(parts > nodes, "Partitions should be more that nodes");
@@ -98,12 +101,13 @@ public class RendezvousAffinityFunctionTest {
}
}
- @NotNull private ArrayList<ClusterNode> prepareNetworkTopology(int nodes) {
- ArrayList<ClusterNode> clusterNodes = new ArrayList<>(nodes);
+ @NotNull private List<ClusterNode> prepareNetworkTopology(int nodes) {
+ var addr = new NetworkAddress("127.0.0.1", 121212);
- for (int i = 0; i < nodes; i++)
- clusterNodes.add(new ClusterNode(UUID.randomUUID().toString(), "Node " + i, "127.0.0.1", 121212));
- return clusterNodes;
+ return IntStream.range(0, nodes)
+ .mapToObj(i -> "Node " + i)
+ .map(name -> new ClusterNode(UUID.randomUUID().toString(), name, addr))
+ .collect(Collectors.toUnmodifiableList());
}
@Test
@@ -114,7 +118,7 @@ public class RendezvousAffinityFunctionTest {
int replicas = 4;
- ArrayList<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
+ List<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
assertTrue(parts > nodes, "Partitions should be more that nodes");
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index aae7d25..9051f12 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
@@ -155,18 +156,16 @@ public class ITMetaStorageServiceTest {
}
/**
- * Run @{code} NODES cluster nodes.
+ * Run {@code NODES} cluster nodes.
*/
@BeforeEach
public void beforeTest() {
- for (int i = 0; i < NODES; i++) {
- cluster.add(
- startClusterNode(
- "node_" + i,
- NODE_PORT_BASE + i,
- IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES).boxed().
- map((port) -> "localhost:" + port).collect(Collectors.toList())));
- }
+ List<NetworkAddress> servers = IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES)
+ .mapToObj(port -> new NetworkAddress("localhost", port))
+ .collect(Collectors.toList());
+
+ for (int i = 0; i < NODES; i++)
+ cluster.add(startClusterNode("node_" + i, NODE_PORT_BASE + i, servers));
for (ClusterService node : cluster)
assertTrue(waitForTopology(node, NODES, 1000));
@@ -968,7 +967,7 @@ public class ITMetaStorageServiceTest {
* @param srvs Server nodes of the cluster.
* @return The client cluster view.
*/
- private ClusterService startClusterNode(String name, int port, List<String> srvs) {
+ private ClusterService startClusterNode(String name, int port, List<NetworkAddress> srvs) {
var ctx = new ClusterLocalConfiguration(name, port, srvs, SERIALIZATION_REGISTRY);
var net = NETWORK_FACTORY.createClusterService(ctx);
diff --git a/modules/network-api/pom.xml b/modules/network-api/pom.xml
index 5029dda..c8514cf 100644
--- a/modules/network-api/pom.xml
+++ b/modules/network-api/pom.xml
@@ -40,6 +40,12 @@
<!-- Test dependencies -->
<dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
index e3d1a4d..535eb81 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
@@ -34,7 +34,7 @@ public class ClusterLocalConfiguration {
private final int port;
/** Addresses of other nodes. */
- private final List<String> memberAddresses;
+ private final List<NetworkAddress> memberAddresses;
/** Message mapper providers. */
private final MessageSerializationRegistry serializationRegistry;
@@ -46,7 +46,7 @@ public class ClusterLocalConfiguration {
* @param serializationRegistry Message serialization registry.
*/
public ClusterLocalConfiguration(
- String name, int port, List<String> memberAddresses, MessageSerializationRegistry serializationRegistry
+ String name, int port, List<NetworkAddress> memberAddresses, MessageSerializationRegistry serializationRegistry
) {
this.name = name;
this.port = port;
@@ -71,7 +71,7 @@ public class ClusterLocalConfiguration {
/**
* @return Addresses of other nodes.
*/
- public List<String> getMemberAddresses() {
+ public List<NetworkAddress> getMemberAddresses() {
return memberAddresses;
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java
index 9ed5378..b10b14a 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -18,7 +18,6 @@ package org.apache.ignite.network;
import java.io.Serializable;
import java.util.Objects;
-import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
/**
@@ -31,27 +30,18 @@ public class ClusterNode implements Serializable {
/** Unique name of member in the cluster. Consistent between restarts. */
private final String name;
- /** Node host. */
- private final String host;
-
- /** Node port. */
- private final int port;
-
- /** Node address in host:port format (lazily evaluated) */
- @IgniteToStringExclude
- private String address;
+ /** Network address of this node. */
+ private final NetworkAddress address;
/**
- * @param id local id that changes between restarts
- * @param name unique name of a member in a cluster
- * @param host node host
- * @param port node port
+ * @param id Local id that changes between restarts.
+ * @param name Unique name of a member in a cluster.
+ * @param address Node address.
*/
- public ClusterNode(String id, String name, String host, int port) {
+ public ClusterNode(String id, String name, NetworkAddress address) {
this.id = id;
this.name = name;
- this.host = host;
- this.port = port;
+ this.address = address;
}
/**
@@ -69,29 +59,12 @@ public class ClusterNode implements Serializable {
}
/**
- * @return Node host name.
- */
- public String host() {
- return host;
- }
-
- /**
- * @return The address.
+ * @return Network address of this node.
*/
- public String address() {
- if (address == null)
- address = host + ":" + port;
-
+ public NetworkAddress address() {
return address;
}
- /**
- * @return Node port.
- */
- public int port() {
- return port;
- }
-
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
@@ -99,12 +72,12 @@ public class ClusterNode implements Serializable {
if (o == null || getClass() != o.getClass())
return false;
ClusterNode that = (ClusterNode)o;
- return port == that.port && name.equals(that.name) && host.equals(that.host);
+ return name.equals(that.name) && address.equals(that.address);
}
/** {@inheritDoc} */
@Override public int hashCode() {
- return Objects.hash(name, host, port);
+ return Objects.hash(name, address);
}
/** {@inheritDoc} */
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index 4312ab9..56e0b77 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -60,12 +60,12 @@ public interface MessagingService {
/**
* Same as {@link #send(ClusterNode, NetworkMessage)} but attaches the given correlation ID to the given message.
*
- * @param addr Recipient network address in host:port format.
+ * @param addr Recipient network address.
* @param msg Message which should be delivered.
* @param correlationId Correlation id when replying to the request.
* @return Future of the send operation.
*/
- CompletableFuture<Void> send(String addr, NetworkMessage msg, String correlationId);
+ CompletableFuture<Void> send(NetworkAddress addr, NetworkMessage msg, String correlationId);
/**
* Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and
@@ -82,12 +82,12 @@ public interface MessagingService {
* Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and
* returns a future that will be completed successfully upon receiving a response.
*
- * @param addr Recipient network address in host:port format.
+ * @param addr Recipient network address.
* @param msg A message.
* @param timeout Waiting for response timeout in milliseconds.
* @return A future holding the response or error if the expected response was not received.
*/
- CompletableFuture<NetworkMessage> invoke(String addr, NetworkMessage msg, long timeout);
+ CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, NetworkMessage msg, long timeout);
/**
* Registers a handler for network message events.
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkAddress.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkAddress.java
new file mode 100644
index 0000000..5094196
--- /dev/null
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkAddress.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A representation of a network address including a host name and a port.
+ */
+public class NetworkAddress implements Serializable {
+ /** Regexp for parsing strings in the "host:port" format. */
+ private static final Pattern ADDRESS_PATTERN = Pattern.compile("(.+):(\\d+)");
+
+ /** Host. */
+ private final String host;
+
+ /** Port. */
+ private final int port;
+
+ /**
+ * @param host Host.
+ * @param port Port.
+ */
+ public NetworkAddress(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ /**
+ * Parses a {@code NetworkAddress} from a string in the "host:port" format.
+ *
+ * @param addrStr String in "host:port" format.
+ * @return Parsed address.
+ * @throws IllegalArgumentException If the provided string does not match the required format.
+ */
+ public static NetworkAddress from(String addrStr) {
+ Matcher matcher = ADDRESS_PATTERN.matcher(addrStr);
+
+ if (!matcher.matches())
+ throw new IllegalArgumentException("Unable to parse the network address from: " + addrStr);
+
+ String host = matcher.group(1);
+
+ String portStr = matcher.group(2);
+
+ int port;
+
+ try {
+ port = Integer.parseInt(portStr);
+ } catch (NumberFormatException ex) {
+ throw new IllegalArgumentException("Illegal port format: " + portStr, ex);
+ }
+
+ return new NetworkAddress(host, port);
+ }
+
+ /**
+ * Creates a {@code NetworkAddress} from a {@link InetSocketAddress}.
+ *
+ * @param addr Address.
+ */
+ public static NetworkAddress from(InetSocketAddress addr) {
+ return new NetworkAddress(addr.getHostName(), addr.getPort());
+ }
+
+ /**
+ * @return Host name.
+ */
+ public String host() {
+ return host;
+ }
+
+ /**
+ * @return Port.
+ */
+ public int port() {
+ return port;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ NetworkAddress address = (NetworkAddress)o;
+ return port == address.port && host.equals(address.host);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(host, port);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return host + ":" + port;
+ }
+}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
index 94e2e0b..5172705 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
@@ -23,8 +23,8 @@ public interface NetworkMessageHandler {
/**
* @param message Message which was received from the cluster.
* @param senderAddr Sender address. Use
- * {@link TopologyService#getByAddress(String)} to resolve a cluster node.
+ * {@link TopologyService#getByAddress} to resolve the corresponding {@link ClusterNode}.
* @param correlationId Correlation id.
*/
- void onReceived(NetworkMessage message, String senderAddr, String correlationId);
+ void onReceived(NetworkMessage message, NetworkAddress senderAddr, String correlationId);
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
index b7fac7d..22eb214 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -49,5 +49,5 @@ public interface TopologyService {
* @param addr The address.
* @return The node or {@code null} if the node is not yet discovered or dead.
*/
- @Nullable ClusterNode getByAddress(String addr);
+ @Nullable ClusterNode getByAddress(NetworkAddress addr);
}
diff --git a/modules/network-api/src/test/java/org/apache/ignite/network/NetworkAddressTest.java b/modules/network-api/src/test/java/org/apache/ignite/network/NetworkAddressTest.java
new file mode 100644
index 0000000..065419b
--- /dev/null
+++ b/modules/network-api/src/test/java/org/apache/ignite/network/NetworkAddressTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test suite for {@link NetworkAddress}.
+ */
+class NetworkAddressTest {
+ /**
+ * Test parsing of a {@link NetworkAddress} from a string.
+ */
+ @Test
+ void testFromString() {
+ var addr = NetworkAddress.from("foobar:1234");
+
+ assertThat(addr.host(), is("foobar"));
+ assertThat(addr.port(), is(1234));
+ }
+
+ /**
+ * Test parsing of a {@link NetworkAddress} from a string that contains a colon, to test for unexpected errors.
+ */
+ @Test
+ void testFromStringWithColon() {
+ var addr = NetworkAddress.from("foo:bar:1234");
+
+ assertThat(addr.host(), is("foo:bar"));
+ assertThat(addr.port(), is(1234));
+ }
+
+ /**
+ * Test parsing of a {@link NetworkAddress} from a string that does not follow the required format.
+ */
+ @Test
+ void testFromMalformedString() {
+ assertThrows(IllegalArgumentException.class, () -> NetworkAddress.from("abcdef"));
+ }
+
+ /**
+ * Test parsing of a {@link NetworkAddress} from a string that does not contain a numeric port.
+ */
+ @Test
+ void testFromMalformedPortString() {
+ assertThrows(IllegalArgumentException.class, () -> NetworkAddress.from("foobar:baz"));
+ }
+
+ /**
+ * Test parsing of a {@link NetworkAddress} from an {@link InetSocketAddress}.
+ */
+ @Test
+ void testFromInetAddr() {
+ var inetAddr = InetSocketAddress.createUnresolved("foobar", 1234);
+ var addr = NetworkAddress.from(inetAddr);
+
+ assertThat(addr.host(), is("foobar"));
+ assertThat(addr.port(), is(1234));
+ }
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
index 7426fe7..02af912 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.AfterEach;
@@ -60,14 +61,16 @@ class ITNodeRestartsTest {
final int initPort = 3344;
String addr = "localhost";
- List<String> addresses = IntStream.range(0, 5).mapToObj(i -> addr + ":" + (initPort + i)).collect(toList());
+ List<NetworkAddress> addresses = IntStream.range(0, 5)
+ .mapToObj(i -> new NetworkAddress(addr, (initPort + i)))
+ .collect(toList());
services = new ArrayList<>(addresses.size());
for (int i = 0; i < addresses.size(); i++) {
- String address = addresses.get(i);
+ NetworkAddress address = addresses.get(i);
- ClusterService svc = startNetwork(address, initPort + i, addresses);
+ ClusterService svc = startNetwork(address.toString(), initPort + i, addresses);
services.add(svc);
}
@@ -87,11 +90,11 @@ class ITNodeRestartsTest {
services.get(idx1).shutdown();
LOG.info("Starting {}", addresses.get(idx0));
- ClusterService svc0 = startNetwork(addresses.get(idx0), initPort + idx0, addresses);
+ ClusterService svc0 = startNetwork(addresses.get(idx0).toString(), initPort + idx0, addresses);
services.set(idx0, svc0);
LOG.info("Starting {}", addresses.get(idx1));
- ClusterService svc2 = startNetwork(addresses.get(idx1), initPort + idx1, addresses);
+ ClusterService svc2 = startNetwork(addresses.get(idx1).toString(), initPort + idx1, addresses);
services.set(idx1, svc2);
for (ClusterService service : services) {
@@ -103,7 +106,7 @@ class ITNodeRestartsTest {
}
/** */
- private ClusterService startNetwork(String name, int port, List<String> addresses) {
+ private ClusterService startNetwork(String name, int port, List<NetworkAddress> addresses) {
var context = new ClusterLocalConfiguration(name, port, addresses, serializationRegistry);
ClusterService clusterService = networkFactory.createClusterService(context);
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 28c1956..058a013 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessagesFactory;
@@ -91,9 +92,8 @@ class ITScaleCubeNetworkMessagingTest {
ClusterService alice = testCluster.members.get(0);
- for (ClusterNode member : alice.topologyService().allMembers()) {
+ for (ClusterNode member : alice.topologyService().allMembers())
alice.messagingService().weakSend(member, testMessage);
- }
boolean messagesReceived = messageReceivedLatch.await(3, TimeUnit.SECONDS);
assertTrue(messagesReceived);
@@ -137,11 +137,11 @@ class ITScaleCubeNetworkMessagingTest {
class Data {
private final TestMessage message;
- private final String sender;
+ private final NetworkAddress sender;
private final String correlationId;
- private Data(TestMessage message, String sender, String correlationId) {
+ private Data(TestMessage message, NetworkAddress sender, String correlationId) {
this.message = message;
this.sender = sender;
this.correlationId = correlationId;
@@ -279,8 +279,8 @@ class ITScaleCubeNetworkMessagingTest {
int initialPort = 3344;
- List<String> addresses = IntStream.range(0, numOfNodes)
- .mapToObj(i -> String.format("localhost:%d", initialPort + i))
+ List<NetworkAddress> addresses = IntStream.range(0, numOfNodes)
+ .mapToObj(i -> new NetworkAddress("localhost", initialPort + i))
.collect(Collectors.toUnmodifiableList());
members = IntStream.range(0, numOfNodes)
@@ -297,7 +297,7 @@ class ITScaleCubeNetworkMessagingTest {
* @param initial Whether this node is the first one.
* @return Started cluster node.
*/
- private ClusterService startNode(String name, int port, List<String> addresses, boolean initial) {
+ private ClusterService startNode(String name, int port, List<NetworkAddress> addresses, boolean initial) {
var context = new ClusterLocalConfiguration(name, port, addresses, serializationRegistry);
ClusterService clusterService = networkFactory.createClusterService(context);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index cc4c5d1..be24cac 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -40,7 +40,7 @@ import org.apache.ignite.network.AbstractClusterService;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.NetworkConfigurationException;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
/**
@@ -141,18 +141,14 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
}
/**
- * Convert string addresses to ScaleCube's {@link Address}es.
- * @param addresses "host:port" formatted strings.
- * @return List of addresses.
+ * Converts the given list of {@link NetworkAddress} into a list of ScaleCube's {@link Address}.
+ *
+ * @param addresses Network address.
+ * @return List of ScaleCube's {@link Address}.
*/
- private static List<Address> parseAddresses(List<String> addresses) {
- try {
- return addresses.stream()
- .map(Address::from)
- .collect(Collectors.toList());
- }
- catch (IllegalArgumentException e) {
- throw new NetworkConfigurationException("Failed to parse address", e);
- }
+ private static List<Address> parseAddresses(List<NetworkAddress> addresses) {
+ return addresses.stream()
+ .map(addr -> Address.create(addr.host(), addr.port()))
+ .collect(Collectors.toList());
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index 5a3e61d..706a1c8 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
import reactor.core.Disposable;
@@ -164,7 +165,7 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
var addr = InetSocketAddress.createUnresolved(address.host(), address.port());
return Mono.fromFuture(() -> {
- ClusterNode node = topologyService.getByAddress(address.toString());
+ ClusterNode node = topologyService.getByAddress(NetworkAddress.from(addr));
String consistentId = node != null ? node.name() : null;
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index a0c75e7..9910eb7 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -17,15 +17,16 @@
package org.apache.ignite.network.scalecube;
-import io.scalecube.cluster.Cluster;
-import io.scalecube.cluster.transport.api.Message;
-import io.scalecube.net.Address;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import io.scalecube.cluster.Cluster;
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.net.Address;
import org.apache.ignite.network.AbstractMessagingService;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NetworkMessageHandler;
@@ -55,23 +56,25 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
void fireEvent(Message message) {
NetworkMessage msg = message.data();
+ var address = NetworkAddress.from(message.header(Message.HEADER_SENDER));
+
String correlationId = message.correlationId();
for (NetworkMessageHandler handler : getMessageHandlers())
- handler.onReceived(msg, message.header(Message.HEADER_SENDER), correlationId);
+ handler.onReceived(msg, address, correlationId);
}
/** {@inheritDoc} */
@Override public void weakSend(ClusterNode recipient, NetworkMessage msg) {
cluster
- .send(clusterNodeAddress(recipient), Message.fromData(msg))
+ .send(fromNetworkAddress(recipient.address()), Message.fromData(msg))
.subscribe();
}
/** {@inheritDoc} */
@Override public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
return cluster
- .send(clusterNodeAddress(recipient), Message.fromData(msg))
+ .send(fromNetworkAddress(recipient.address()), Message.fromData(msg))
.toFuture();
}
@@ -80,16 +83,15 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
return send(recipient.address(), msg, correlationId);
}
- @Override public CompletableFuture<Void> send(String addr, NetworkMessage msg, String correlationId) {
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> send(NetworkAddress addr, NetworkMessage msg, String correlationId) {
var message = Message
.withData(msg)
.correlationId(correlationId)
.build();
- Address address = Address.from(addr);
-
return cluster
- .send(address, message)
+ .send(fromNetworkAddress(addr), message)
.toFuture();
}
@@ -99,28 +101,23 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
}
/** {@inheritDoc} */
- @Override public CompletableFuture<NetworkMessage> invoke(String addr, NetworkMessage msg, long timeout) {
+ @Override public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, NetworkMessage msg, long timeout) {
var message = Message
.withData(msg)
.correlationId(UUID.randomUUID().toString())
.build();
- Address address = Address.from(addr);
-
return cluster
- .requestResponse(address, message)
+ .requestResponse(fromNetworkAddress(addr), message)
.timeout(Duration.ofMillis(timeout))
.toFuture()
.thenApply(m -> m == null ? null : m.data()); // The result can be null on node stopping.
}
/**
- * Extracts the given node's {@link Address}.
- *
- * @param node Node.
- * @return Node's address.
+ * Converts a {@link NetworkAddress} into ScaleCube's {@link Address}.
*/
- private static Address clusterNodeAddress(ClusterNode node) {
- return Address.create(node.host(), node.port());
+ private static Address fromNetworkAddress(NetworkAddress address) {
+ return Address.create(address.host(), address.port());
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 0fc3f22..0b1924c 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -25,6 +25,7 @@ import io.scalecube.cluster.membership.MembershipEvent;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.AbstractTopologyService;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.network.TopologyService;
@@ -39,7 +40,7 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
private ClusterNode localMember;
/** Topology members. */
- private final ConcurrentMap<String, ClusterNode> members = new ConcurrentHashMap<>();
+ private final ConcurrentMap<NetworkAddress, ClusterNode> members = new ConcurrentHashMap<>();
/**
* Sets the ScaleCube's local {@link Member}.
@@ -119,7 +120,7 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
}
/** {@inheritDoc} */
- @Override public ClusterNode getByAddress(String addr) {
+ @Override public ClusterNode getByAddress(NetworkAddress addr) {
return members.get(addr);
}
@@ -127,6 +128,8 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
* Converts the given {@link Member} to a {@link ClusterNode}.
*/
private static ClusterNode fromMember(Member member) {
- return new ClusterNode(member.id(), member.alias(), member.address().host(), member.address().port());
+ var addr = new NetworkAddress(member.address().host(), member.address().port());
+
+ return new ClusterNode(member.id(), member.alias(), addr);
}
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
index a088a2b..451d3a4 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
@@ -18,7 +18,9 @@
package org.apache.ignite.raft.client;
import java.io.Serializable;
+import java.util.Objects;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.NetworkAddress;
/**
* A participant of a replication group.
@@ -27,7 +29,7 @@ public final class Peer implements Serializable {
/**
* Network address.
*/
- private final String addr;
+ private final NetworkAddress addr;
/**
* Peer's local priority value, if node don't support priority election,
@@ -46,7 +48,7 @@ public final class Peer implements Serializable {
/**
* @param addr The address.
*/
- public Peer(String addr) {
+ public Peer(NetworkAddress addr) {
this(addr, ElectionPriority.DISABLED);
}
@@ -54,7 +56,7 @@ public final class Peer implements Serializable {
* @param addr The address.
* @param priority Election priority.
*/
- public Peer(String addr, int priority) {
+ public Peer(NetworkAddress addr, int priority) {
this.addr = addr;
this.priority = priority;
}
@@ -62,7 +64,7 @@ public final class Peer implements Serializable {
/**
* @return The address.
*/
- public String address() {
+ public NetworkAddress address() {
return this.addr;
}
@@ -75,22 +77,17 @@ public final class Peer implements Serializable {
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Peer peer = (Peer) o;
-
- if (priority != peer.priority) return false;
- if (!addr.equals(peer.addr)) return false;
-
- return true;
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ Peer peer = (Peer)o;
+ return priority == peer.priority && addr.equals(peer.addr);
}
/** {@inheritDoc} */
@Override public int hashCode() {
- int result = addr.hashCode();
- result = 31 * result + priority;
- return result;
+ return Objects.hash(addr, priority);
}
/** {@inheritDoc} */
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
index ca0c346..0570cb5 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
@@ -25,10 +25,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.RaftErrorCode;
import org.apache.ignite.raft.client.WriteCommand;
@@ -45,10 +48,8 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
-import static java.util.List.of;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -59,7 +60,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.when;
@@ -72,11 +72,10 @@ public class RaftGroupServiceTest {
private static final IgniteLogger LOG = IgniteLogger.forClass(RaftGroupServiceTest.class);
/** */
- private static final List<Peer> NODES = of(
- new Peer("localhost:20000"),
- new Peer("localhost:20001"),
- new Peer("localhost:20002")
- );
+ private static final List<Peer> NODES = Stream.of(20000, 20001, 20002)
+ .map(port -> new NetworkAddress("localhost", port))
+ .map(Peer::new)
+ .collect(Collectors.toUnmodifiableList());
/** */
private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
@@ -420,7 +419,9 @@ public class RaftGroupServiceTest {
RaftGroupService service =
new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
- CompletableFuture<Void> fut = service.snapshot(new Peer("localhost:8082"));
+ var addr = new NetworkAddress("localhost", 8082);
+
+ CompletableFuture<Void> fut = service.snapshot(new Peer(addr));
try {
fut.get();
@@ -444,7 +445,9 @@ public class RaftGroupServiceTest {
RaftGroupService service =
new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
- CompletableFuture<Void> fut = service.snapshot(new Peer("localhost:8082"));
+ var addr = new NetworkAddress("localhost", 8082);
+
+ CompletableFuture<Void> fut = service.snapshot(new Peer(addr));
try {
fut.get();
@@ -461,14 +464,22 @@ public class RaftGroupServiceTest {
* @param peer Fail the request targeted to given peer.
*/
private void mockUserInput(boolean delay, @Nullable Peer peer) {
- Mockito.doAnswer(invocation -> {
- String target = invocation.getArgument(0);
+ when(messagingService.invoke(
+ any(NetworkAddress.class),
+ argThat(new ArgumentMatcher<ActionRequest>() {
+ @Override public boolean matches(ActionRequest arg) {
+ return arg.command() instanceof TestCommand;
+ }
+ }),
+ anyLong()
+ )).then(invocation -> {
+ NetworkAddress target = invocation.getArgument(0);
if (peer != null && target.equals(peer.address()))
return failedFuture(new IgniteInternalException(new ConnectException()));
if (delay) {
- return new CompletableFuture<>().completeAsync(() -> {
+ return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
}
@@ -490,68 +501,51 @@ public class RaftGroupServiceTest {
resp = FACTORY.actionResponse().result(new TestResponse()).build();
return completedFuture(resp);
- })
- .when(messagingService)
- .invoke(
- anyString(),
- argThat(new ArgumentMatcher<ActionRequest>() {
- @Override public boolean matches(ActionRequest arg) {
- return arg.command() instanceof TestCommand;
- }
- }),
- anyLong()
- );
+ });
}
/**
* @param delay {@code True} to delay response.
*/
private void mockLeaderRequest(boolean delay) {
- Mockito.doAnswer(invocation -> {
- if (delay) {
- return new CompletableFuture<>().completeAsync(() -> {
- try {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {
- fail();
- }
-
- return FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build();
- });
- }
-
- Object resp;
-
- Peer leader0 = leader;
-
- if (leader0 == null) {
- resp = FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build();
- }
- else {
- resp = FACTORY.getLeaderResponse().leader(leader0).build();
- }
-
- return completedFuture(resp);
- })
- .when(messagingService)
- .invoke(anyString(), any(GetLeaderRequest.class), anyLong());
+ when(messagingService.invoke(any(NetworkAddress.class), any(GetLeaderRequest.class), anyLong()))
+ .then(invocation -> {
+ if (delay) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ fail();
+ }
+
+ return FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build();
+ });
+ }
+
+ Peer leader0 = leader;
+
+ Object resp = leader0 == null ?
+ FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build() :
+ FACTORY.getLeaderResponse().leader(leader0).build();
+
+ return completedFuture(resp);
+ });
}
/**
* @param mode Mock mode.
*/
private void mockSnapshotRequest(int mode) {
- Mockito.doAnswer(invocation -> {
- if (mode == 0) {
- return completedFuture(FACTORY.raftErrorResponse().errorCode(RaftErrorCode.SNAPSHOT).
- errorMessage("Failed to create a snapshot").build());
- }
- else
- return failedFuture(new IgniteInternalException("Very bad"));
- })
- .when(messagingService)
- .invoke(anyString(), any(SnapshotRequest.class), anyLong());
+ when(messagingService.invoke(any(NetworkAddress.class), any(SnapshotRequest.class), anyLong()))
+ .then(invocation -> {
+ if (mode == 0) {
+ return completedFuture(FACTORY.raftErrorResponse().errorCode(RaftErrorCode.SNAPSHOT).
+ errorMessage("Failed to create a snapshot").build());
+ }
+ else
+ return failedFuture(new IgniteInternalException("Very bad"));
+ });
}
/** */
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
index a7730be..eb8c11d 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.CliService;
import org.apache.ignite.raft.jraft.JRaftUtils;
@@ -112,7 +113,10 @@ public class ITCliServiceTest {
CliOptions opts = new CliOptions();
opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, "client"));
- List<String> memberAddresses = peers.stream().map(p -> p.getEndpoint().toString()).collect(Collectors.toList());
+ List<NetworkAddress> memberAddresses = peers.stream()
+ .map(PeerId::getEndpoint)
+ .map(JRaftUtils::addressFromEndpoint)
+ .collect(Collectors.toList());
var registry = new MessageSerializationRegistryImpl();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index 5ac11c7..369cc38 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -41,6 +41,7 @@ import java.util.stream.Stream;
import com.codahale.metrics.ConsoleReporter;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.Iterator;
@@ -2815,7 +2816,7 @@ public class ITNodeTest {
@Test
public void testBootStrapWithSnapshot() throws Exception {
- Endpoint addr = JRaftUtils.getEndPoint("127.0.0.1:5006");
+ Endpoint addr = new Endpoint("127.0.0.1", 5006);
MockStateMachine fsm = new MockStateMachine(addr);
for (char ch = 'a'; ch <= 'z'; ch++)
@@ -2857,7 +2858,7 @@ public class ITNodeTest {
@Test
public void testBootStrapWithoutSnapshot() throws Exception {
- Endpoint addr = JRaftUtils.getEndPoint("127.0.0.1:5006");
+ Endpoint addr = new Endpoint("127.0.0.1", 5006);
MockStateMachine fsm = new MockStateMachine(addr);
BootstrapOptions opts = new BootstrapOptions();
@@ -3392,11 +3393,12 @@ public class ITNodeTest {
private RaftGroupService createService(String groupId, PeerId peerId, NodeOptions nodeOptions) {
Configuration initialConf = nodeOptions.getInitialConf();
- var servers = List.<String>of();
+ var servers = List.<NetworkAddress>of();
if (initialConf != null) {
servers = Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream())
- .map(id -> id.getEndpoint().toString())
+ .map(PeerId::getEndpoint)
+ .map(JRaftUtils::addressFromEndpoint)
.collect(Collectors.toList());
}
@@ -3426,7 +3428,7 @@ public class ITNodeTest {
/**
* Creates a non-started {@link ClusterService}.
*/
- private static ClusterService createClusterService(Endpoint endpoint, List<String> members) {
+ private static ClusterService createClusterService(Endpoint endpoint, List<NetworkAddress> members) {
var registry = new TestMessageSerializationRegistryImpl();
var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), members, registry);
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index f3c19cc..4f4f0ac 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -24,11 +24,14 @@ import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.exception.RaftException;
@@ -92,10 +95,10 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
/**
* Initial configuration.
*/
- private static final List<Peer> INITIAL_CONF = List.of(
- new Peer(getLocalAddress() + ":" + PORT),
- new Peer(getLocalAddress() + ":" + (PORT + 1)),
- new Peer(getLocalAddress() + ":" + (PORT + 2)));
+ private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+ .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+ .map(Peer::new)
+ .collect(Collectors.toUnmodifiableList());
/**
* Listener factory.
@@ -148,8 +151,9 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
* @return Raft server instance.
*/
private JRaftServerImpl startServer(int idx, Consumer<RaftServer> clo) {
- ClusterService service = clusterService("server" + idx, PORT + idx,
- List.of(getLocalAddress() + ":" + PORT), true);
+ var addr = new NetworkAddress(getLocalAddress(), PORT);
+
+ ClusterService service = clusterService("server" + idx, PORT + idx, List.of(addr), true);
JRaftServerImpl server = new JRaftServerImpl(service, dataPath, FACTORY) {
@Override public void shutdown() throws Exception {
@@ -173,10 +177,10 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
* @return The client.
*/
private RaftGroupService startClient(String groupId) {
- String addr = getLocalAddress() + ":" + PORT;
+ var addr = new NetworkAddress(getLocalAddress(), PORT);
- ClusterService clientNode = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(),
- List.of(addr), true);
+ ClusterService clientNode = clusterService(
+ "client_" + groupId + "_", CLIENT_PORT + clients.size(), List.of(addr), true);
RaftGroupServiceImpl client = new RaftGroupServiceImpl(groupId, clientNode, FACTORY, 10_000,
List.of(new Peer(addr)), false, 200) {
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
index eb2fc18..38aec61 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
@@ -72,9 +73,9 @@ class ITSimpleCounterServerTest extends RaftServerAbstractTest {
void before(TestInfo testInfo) {
LOG.info(">>>> Starting test {}", testInfo.getTestMethod().orElseThrow().getName());
- String id = "localhost:" + PORT;
+ var addr = new NetworkAddress("localhost", PORT);
- ClusterService service = clusterService(id, PORT, List.of(), true);
+ ClusterService service = clusterService(addr.toString(), PORT, List.of(), true);
server = new RaftServerImpl(service, FACTORY) {
@Override public synchronized void shutdown() throws Exception {
@@ -89,7 +90,7 @@ class ITSimpleCounterServerTest extends RaftServerAbstractTest {
server.startRaftGroup(COUNTER_GROUP_ID_0, new CounterListener(), List.of(new Peer(serverNode.address())));
server.startRaftGroup(COUNTER_GROUP_ID_1, new CounterListener(), List.of(new Peer(serverNode.address())));
- ClusterService clientNode1 = clusterService("localhost:" + (PORT + 1), PORT + 1, List.of(id), true);
+ ClusterService clientNode1 = clusterService("localhost:" + (PORT + 1), PORT + 1, List.of(addr), true);
client1 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_0, clientNode1, FACTORY, 1000,
List.of(new Peer(serverNode.address())), false, 200) {
@@ -100,7 +101,7 @@ class ITSimpleCounterServerTest extends RaftServerAbstractTest {
}
};
- ClusterService clientNode2 = clusterService("localhost:" + (PORT + 2), PORT + 2, List.of(id), true);
+ ClusterService clientNode2 = clusterService("localhost:" + (PORT + 2), PORT + 2, List.of(addr), true);
client2 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_1, clientNode2, FACTORY, 1000,
List.of(new Peer(serverNode.address())), false, 200) {
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
index c81ee1d..a34c3ad 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
@@ -54,7 +55,7 @@ abstract class RaftServerAbstractTest {
* @param servers Server nodes of the cluster.
* @return The client cluster view.
*/
- protected ClusterService clusterService(String name, int port, List<String> servers, boolean start) {
+ protected ClusterService clusterService(String name, int port, List<NetworkAddress> servers, boolean start) {
var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
var network = NETWORK_FACTORY.createClusterService(context);
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index db9a3e8..8b77e79 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -22,11 +22,11 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.ElectionPriority;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.WriteCommand;
@@ -48,10 +48,11 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
-import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.JDKMarshaller;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
+
/**
* Raft server implementation on top of forked JRaft library.
*/
@@ -134,9 +135,7 @@ public class JRaftServerImpl implements RaftServer {
public String getServerDataPath(String groupId) {
ClusterNode clusterNode = service.topologyService().localMember();
- Endpoint endpoint = new Endpoint(clusterNode.host(), clusterNode.port());
-
- return this.dataPath + File.separator + groupId + "_" + endpoint.toString().replace(':', '_');
+ return this.dataPath + File.separator + groupId + "_" + clusterNode.address().toString().replace(':', '_');
}
/** {@inheritDoc} */
@@ -148,9 +147,6 @@ public class JRaftServerImpl implements RaftServer {
// Thread pools are shared by all raft groups.
final NodeOptions nodeOptions = opts.copy();
- ClusterNode clusterNode = service.topologyService().localMember();
- Endpoint endpoint = new Endpoint(clusterNode.host(), clusterNode.port());
-
final String serverDataPath = getServerDataPath(groupId);
new File(serverDataPath).mkdirs();
@@ -170,8 +166,11 @@ public class JRaftServerImpl implements RaftServer {
nodeOptions.setRpcClient(client);
- final RaftGroupService server = new RaftGroupService(groupId, new PeerId(endpoint, 0,
- ElectionPriority.DISABLED), nodeOptions, rpcServer, nodeManager, true);
+ NetworkAddress addr = service.topologyService().localMember().address();
+
+ var peerId = new PeerId(addr.host(), addr.port(), 0, ElectionPriority.DISABLED);
+
+ var server = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager, true);
server.start();
@@ -201,7 +200,7 @@ public class JRaftServerImpl implements RaftServer {
PeerId peerId = service.getRaftNode().getNodeId().getPeerId();
- return new Peer(peerId.getEndpoint().toString(), peerId.getPriority());
+ return new Peer(addressFromEndpoint(peerId.getEndpoint()), peerId.getPriority());
}
/**
@@ -240,16 +239,16 @@ public class JRaftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override public void onApply(Iterator iter) {
try {
- listener.onWrite(new java.util.Iterator<CommandClosure<WriteCommand>>() {
+ listener.onWrite(new java.util.Iterator<>() {
@Override public boolean hasNext() {
return iter.hasNext();
}
@Override public CommandClosure<WriteCommand> next() {
- @Nullable CommandClosure<WriteCommand> done = (CommandClosure<WriteCommand>) iter.done();
+ @Nullable CommandClosure<WriteCommand> done = (CommandClosure<WriteCommand>)iter.done();
ByteBuffer data = iter.getData();
- return new CommandClosure<WriteCommand>() {
+ return new CommandClosure<>() {
@Override public WriteCommand command() {
return JDKMarshaller.DEFAULT.unmarshall(data.array());
}
@@ -277,22 +276,20 @@ public class JRaftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override public void onSnapshotSave(SnapshotWriter writer, Closure done) {
try {
- listener.onSnapshotSave(writer.getPath(), new Consumer<Throwable>() {
- @Override public void accept(Throwable res) {
- if (res == null) {
- File file = new File(writer.getPath());
-
- for (File file0 : file.listFiles()) {
- if (file0.isFile())
- writer.addFile(file0.getName(), null);
- }
+ listener.onSnapshotSave(writer.getPath(), res -> {
+ if (res == null) {
+ File file = new File(writer.getPath());
- done.run(Status.OK());
- }
- else {
- done.run(new Status(RaftError.EIO, "Fail to save snapshot to %s, reason %s",
- writer.getPath(), res.getMessage()));
+ for (File file0 : file.listFiles()) {
+ if (file0.isFile())
+ writer.addFile(file0.getName(), null);
}
+
+ done.run(Status.OK());
+ }
+ else {
+ done.run(new Status(RaftError.EIO, "Fail to save snapshot to %s, reason %s",
+ writer.getPath(), res.getMessage()));
}
});
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 8aea56e..0386379 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -29,6 +29,7 @@ import java.util.function.BiConsumer;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.RaftErrorCode;
@@ -91,7 +92,9 @@ public class RaftServerImpl implements RaftServer {
service.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {
if (message instanceof GetLeaderRequest) {
- GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(new Peer(service.topologyService().localMember().address())).build();
+ var localPeer = new Peer(service.topologyService().localMember().address());
+
+ GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(localPeer).build();
service.messagingService().send(senderAddr, resp, correlationId);
}
@@ -171,7 +174,7 @@ public class RaftServerImpl implements RaftServer {
* @param <T> Command type.
*/
private <T extends Command> void handleActionRequest(
- String sender,
+ NetworkAddress sender,
ActionRequest req,
String corellationId,
BlockingQueue<CommandClosureEx<T>> queue,
@@ -222,7 +225,7 @@ public class RaftServerImpl implements RaftServer {
}
}
- private void sendError(String sender, String corellationId, RaftErrorCode errorCode) {
+ private void sendError(NetworkAddress sender, String corellationId, RaftErrorCode errorCode) {
RaftErrorResponse resp = clientMsgFactory.raftErrorResponse().errorCode(errorCode).build();
service.messagingService().send(sender, resp, corellationId);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
index 699b1cd..a3bf263 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.Scheduler;
@@ -187,17 +188,13 @@ public final class JRaftUtils {
}
/**
- * Create a Endpoint instance from a string in the form of "host:port", returns null when string is blank.
+ * Creates a {@link NetworkAddress} from an {@link Endpoint}.
+ *
+ * @param endpoint Endpoint.
+ * @return Network address.
*/
- public static Endpoint getEndPoint(final String s) {
- if (StringUtils.isBlank(s)) {
- return null;
- }
- final String[] tmps = StringUtils.split(s, ':');
- if (tmps.length != 2) {
- throw new IllegalArgumentException("Invalid endpoint string: " + s);
- }
- return new Endpoint(tmps[0], Integer.parseInt(tmps[1]));
+ public static NetworkAddress addressFromEndpoint(Endpoint endpoint) {
+ return new NetworkAddress(endpoint.getIp(), endpoint.getPort());
}
private JRaftUtils() {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/PeerId.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/PeerId.java
index bb3e971..094e3f1 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/PeerId.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/PeerId.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.entity;
import java.io.Serializable;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.jraft.core.ElectionPriority;
import org.apache.ignite.raft.jraft.util.AsciiStringUtil;
@@ -106,6 +107,10 @@ public class PeerId implements Copiable<PeerId>, Serializable, Checksum {
this.idx = idx;
}
+ public PeerId(NetworkAddress address) {
+ this(address.host(), address.port());
+ }
+
public PeerId(final String ip, final int port) {
this(ip, port, 0);
}
@@ -283,6 +288,6 @@ public class PeerId implements Copiable<PeerId>, Serializable, Checksum {
}
public static PeerId fromPeer(Peer p) {
- return PeerId.parsePeer(p.address() + ":0:" + p.getPriority());
+ return new PeerId(p.address().host(), p.address().port(), 0, p.getPriority());
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcContext.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcContext.java
index cef732d..239a67d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcContext.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft.rpc;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.NodeManager;
/**
@@ -39,12 +40,12 @@ public interface RpcContext {
*
* @return Remote address.
*/
- String getRemoteAddress();
+ NetworkAddress getRemoteAddress();
/**
* Get the local address of the server.
*
* @return Local address.
*/
- String getLocalAddress();
+ NetworkAddress getLocalAddress();
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index 1cd2909..dee12eb 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -23,9 +23,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.raft.jraft.error.InvokeTimeoutException;
@@ -38,11 +38,15 @@ import org.apache.ignite.raft.jraft.rpc.RpcClientEx;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.Utils;
+import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
+
public class IgniteRpcClient implements RpcClientEx {
private volatile BiPredicate<Object, String> recordPred;
+
private BiPredicate<Object, String> blockPred;
private LinkedBlockingQueue<Object[]> blockedMsgs = new LinkedBlockingQueue<>();
+
private LinkedBlockingQueue<Object[]> recordedMsgs = new LinkedBlockingQueue<>();
private final ClusterService service;
@@ -60,7 +64,9 @@ public class IgniteRpcClient implements RpcClientEx {
/** {@inheritDoc} */
@Override public boolean checkConnection(Endpoint endpoint) {
- return service.topologyService().getByAddress(endpoint.toString()) != null;
+ NetworkAddress addr = addressFromEndpoint(endpoint);
+
+ return service.topologyService().getByAddress(addr) != null;
}
/** {@inheritDoc} */
@@ -75,7 +81,7 @@ public class IgniteRpcClient implements RpcClientEx {
InvokeContext ctx,
InvokeCallback callback,
long timeoutMs
- ) throws InterruptedException, RemotingException {
+ ) {
CompletableFuture<Message> fut = new CompletableFuture<>();
fut.orTimeout(timeoutMs, TimeUnit.MILLISECONDS).
@@ -103,11 +109,12 @@ public class IgniteRpcClient implements RpcClientEx {
synchronized (this) {
if (blockPred != null && blockPred.test(request, endpoint.toString())) {
blockedMsgs.add(new Object[] {
- request, endpoint.toString(), fut.hashCode(), System.currentTimeMillis(), new Runnable() {
- @Override public void run() {
- send(endpoint, request, fut, timeoutMs);
- }
- }});
+ request,
+ endpoint.toString(),
+ fut.hashCode(),
+ System.currentTimeMillis(),
+ (Runnable)() -> send(endpoint, request, fut, timeoutMs)
+ });
return fut;
}
@@ -119,15 +126,14 @@ public class IgniteRpcClient implements RpcClientEx {
}
public void send(Endpoint endpoint, Object request, CompletableFuture<Message> fut, long timeout) {
- CompletableFuture<NetworkMessage> fut0 = service.messagingService().invoke(endpoint.toString(), (NetworkMessage) request, timeout);
-
- fut0.whenComplete(new BiConsumer<NetworkMessage, Throwable>() {
- @Override public void accept(NetworkMessage resp, Throwable err) {
- if (err != null)
- fut.completeExceptionally(err);
- else
- fut.complete((Message) resp);
- }
+ CompletableFuture<NetworkMessage> fut0 = service.messagingService()
+ .invoke(addressFromEndpoint(endpoint), (NetworkMessage) request, timeout);
+
+ fut0.whenComplete((resp, err) -> {
+ if (err != null)
+ fut.completeExceptionally(err);
+ else
+ fut.complete((Message) resp);
});
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 5492264..cb2da9d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
@@ -146,11 +147,11 @@ public class IgniteRpcServer implements RpcServer<Void> {
service.messagingService().send(senderAddr, (NetworkMessage) responseObj, corellationId);
}
- @Override public String getRemoteAddress() {
+ @Override public NetworkAddress getRemoteAddress() {
return senderAddr;
}
- @Override public String getLocalAddress() {
+ @Override public NetworkAddress getLocalAddress() {
return service.topologyService().localMember().address();
}
};
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/ActionRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/ActionRequestProcessor.java
index c73df6d..7f80b80 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/ActionRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/ActionRequestProcessor.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Executor;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.RaftErrorCode;
@@ -27,6 +28,7 @@ import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.message.ActionRequest;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.message.RaftErrorResponse;
import org.apache.ignite.raft.client.message.RaftErrorResponseBuilder;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.jraft.Closure;
@@ -40,13 +42,15 @@ import org.apache.ignite.raft.jraft.rpc.RpcContext;
import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
import org.apache.ignite.raft.jraft.util.BytesUtil;
import org.apache.ignite.raft.jraft.util.JDKMarshaller;
-import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+
+import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
/**
* Process action request.
*/
public class ActionRequestProcessor implements RpcProcessor<ActionRequest> {
private final Executor executor;
+
private final RaftClientMessagesFactory factory;
public ActionRequestProcessor(Executor executor, RaftClientMessagesFactory factory) {
@@ -56,7 +60,7 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> {
/** {@inheritDoc} */
@Override public void handleRequest(RpcContext rpcCtx, ActionRequest request) {
- Node node = rpcCtx.getNodeManager().get(request.groupId(), PeerId.parsePeer(rpcCtx.getLocalAddress()));
+ Node node = rpcCtx.getNodeManager().get(request.groupId(), new PeerId(rpcCtx.getLocalAddress()));
if (node == null) {
rpcCtx.sendResponse(factory.raftErrorResponse().errorCode(RaftErrorCode.ILLEGAL_STATE).build());
@@ -87,9 +91,9 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> {
(JRaftServerImpl.DelegatingStateMachine) node.getOptions().getFsm();
try {
- fsm.getListener().onRead(List.<CommandClosure<ReadCommand>>of(new CommandClosure<ReadCommand>() {
+ fsm.getListener().onRead(List.<CommandClosure<ReadCommand>>of(new CommandClosure<>() {
@Override public ReadCommand command() {
- return (ReadCommand) request.command();
+ return (ReadCommand)request.command();
}
@Override public void result(Serializable res) {
@@ -112,9 +116,9 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> {
(JRaftServerImpl.DelegatingStateMachine) node.getOptions().getFsm();
try {
- fsm.getListener().onRead(List.<CommandClosure<ReadCommand>>of(new CommandClosure<ReadCommand>() {
+ fsm.getListener().onRead(List.<CommandClosure<ReadCommand>>of(new CommandClosure<>() {
@Override public ReadCommand command() {
- return (ReadCommand) request.command();
+ return (ReadCommand)request.command();
}
@Override public void result(Serializable res) {
@@ -140,15 +144,17 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> {
}
/**
- * @param node The node.
- * @param corellationId Corellation id.
+ * @param ctx Context.
* @param errorCode Error code.
- * @param newLeader New leader.
+ * @param msg Message.
*/
private void sendError(RpcContext ctx, RaftErrorCode errorCode, String msg) {
- RaftErrorResponseBuilder resp = factory.raftErrorResponse().errorCode(errorCode).errorMessage(msg);
+ RaftErrorResponse resp = factory.raftErrorResponse()
+ .errorCode(errorCode)
+ .errorMessage(msg)
+ .build();
- ctx.sendResponse(((RaftErrorResponseBuilder) resp).build());
+ ctx.sendResponse(resp);
}
/**
@@ -176,11 +182,10 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> {
RaftErrorResponseBuilder resp =
factory.raftErrorResponse().errorCode(raftErrorCode).errorMessage(status.getErrorMsg());
- if (newLeader != null) {
- resp.newLeader(new Peer(newLeader.getEndpoint().toString()));
- }
+ if (newLeader != null)
+ resp.newLeader(new Peer(addressFromEndpoint(newLeader.getEndpoint())));
- ctx.sendResponse(((RaftErrorResponseBuilder) resp).build());
+ ctx.sendResponse(resp.build());
}
/**
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/GetLeaderRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/GetLeaderRequestProcessor.java
index 9d4329c..9c025e3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/GetLeaderRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/GetLeaderRequestProcessor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.rpc.impl.client;
import java.util.concurrent.Executor;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.RaftErrorCode;
import org.apache.ignite.raft.client.message.GetLeaderRequest;
@@ -26,11 +27,14 @@ import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.rpc.RpcContext;
import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
+import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
+
/**
* Process get leader request.
*/
public class GetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> {
private final Executor executor;
+
private final RaftClientMessagesFactory factory;
public GetLeaderRequestProcessor(Executor executor, RaftClientMessagesFactory factory) {
@@ -40,7 +44,9 @@ public class GetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest>
/** {@inheritDoc} */
@Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {
- Node node = rpcCtx.getNodeManager().get(request.groupId(), PeerId.parsePeer(rpcCtx.getLocalAddress()));
+ NetworkAddress localAddr = rpcCtx.getLocalAddress();
+
+ Node node = rpcCtx.getNodeManager().get(request.groupId(), new PeerId(localAddr.host(), localAddr.port()));
if (node == null) {
rpcCtx.sendResponse(factory.raftErrorResponse().errorCode(RaftErrorCode.ILLEGAL_STATE).build());
@@ -57,7 +63,7 @@ public class GetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest>
}
// Find by host and port.
- Peer leader0 = new Peer(leaderId.getEndpoint().toString());
+ Peer leader0 = new Peer(addressFromEndpoint(leaderId.getEndpoint()));
rpcCtx.sendResponse(factory.getLeaderResponse().leader(leader0).build());
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/SnapshotRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/SnapshotRequestProcessor.java
index 19db42e..2c8d25f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/SnapshotRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/client/SnapshotRequestProcessor.java
@@ -21,9 +21,7 @@ import org.apache.ignite.raft.client.RaftErrorCode;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.message.RaftErrorResponseBuilder;
import org.apache.ignite.raft.client.message.SnapshotRequest;
-import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Node;
-import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.rpc.RpcContext;
import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
@@ -33,6 +31,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
*/
public class SnapshotRequestProcessor implements RpcProcessor<SnapshotRequest> {
private final Executor executor;
+
private final RaftClientMessagesFactory factory;
public SnapshotRequestProcessor(Executor executor, RaftClientMessagesFactory factory) {
@@ -42,7 +41,7 @@ public class SnapshotRequestProcessor implements RpcProcessor<SnapshotRequest> {
/** {@inheritDoc} */
@Override public void handleRequest(RpcContext rpcCtx, SnapshotRequest request) {
- Node node = rpcCtx.getNodeManager().get(request.groupId(), PeerId.parsePeer(rpcCtx.getLocalAddress()));
+ Node node = rpcCtx.getNodeManager().get(request.groupId(), new PeerId(rpcCtx.getLocalAddress()));
if (node == null) {
rpcCtx.sendResponse(factory.raftErrorResponse().errorCode(RaftErrorCode.ILLEGAL_STATE).build());
@@ -50,16 +49,13 @@ public class SnapshotRequestProcessor implements RpcProcessor<SnapshotRequest> {
return;
}
- node.snapshot(new Closure() {
- @Override public void run(Status status) {
- RaftErrorResponseBuilder resp = factory.raftErrorResponse();
+ node.snapshot(status -> {
+ RaftErrorResponseBuilder resp = factory.raftErrorResponse();
- if (!status.isOk()) {
- resp.errorCode(RaftErrorCode.SNAPSHOT).errorMessage(status.getErrorMsg());
- }
+ if (!status.isOk())
+ resp.errorCode(RaftErrorCode.SNAPSHOT).errorMessage(status.getErrorMsg());
- rpcCtx.sendResponse(resp.build());
- }
+ rpcCtx.sendResponse(resp.build());
});
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 6dd2f90..a5578bd 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
@@ -203,11 +204,15 @@ public class TestCluster {
final MockStateMachine fsm = new MockStateMachine(listenAddr);
nodeOptions.setFsm(fsm);
- if (!emptyPeers) {
+ if (!emptyPeers)
nodeOptions.setInitialConf(new Configuration(this.peers, this.learners));
- }
- List<String> servers = emptyPeers ? List.of() : this.peers.stream().map(p -> p.getEndpoint().toString()).collect(Collectors.toList());
+ List<NetworkAddress> servers = emptyPeers ?
+ List.of() :
+ this.peers.stream()
+ .map(PeerId::getEndpoint)
+ .map(JRaftUtils::addressFromEndpoint)
+ .collect(Collectors.toList());
NodeManager nodeManager = new NodeManager();
@@ -249,7 +254,7 @@ public class TestCluster {
/**
* Creates a non-started {@link ClusterService}.
*/
- private static ClusterService createClusterService(Endpoint endpoint, List<String> members) {
+ private static ClusterService createClusterService(Endpoint endpoint, List<NetworkAddress> members) {
var registry = new TestMessageSerializationRegistryImpl();
var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), members, registry);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index 6c6abe4..b1e0133 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -22,12 +22,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
+
/**
*
*/
@@ -35,8 +38,9 @@ public class IgniteRpcTest extends AbstractRpcTest {
/** The counter. */
private final AtomicInteger cntr = new AtomicInteger();
+ /** {@inheritDoc} */
@Override public RpcServer<?> createServer(Endpoint endpoint) {
- ClusterService service = createService(endpoint.toString(), endpoint.getPort(), List.of());
+ ClusterService service = createService(endpoint.toString(), endpoint.getPort());
var server = new TestIgniteRpcServer(service, List.of(), new NodeManager(), new NodeOptions()) {
@Override public void shutdown() {
@@ -55,7 +59,7 @@ public class IgniteRpcTest extends AbstractRpcTest {
@Override public RpcClient createClient0() {
int i = cntr.incrementAndGet();
- ClusterService service = createService("client" + i, endpoint.getPort() - i, List.of(endpoint.toString()));
+ ClusterService service = createService("client" + i, endpoint.getPort() - i, addressFromEndpoint(endpoint));
IgniteRpcClient client = new IgniteRpcClient(service) {
@Override public void shutdown() {
@@ -78,9 +82,9 @@ public class IgniteRpcTest extends AbstractRpcTest {
* @param servers Server nodes of the cluster.
* @return The client cluster view.
*/
- private static ClusterService createService(String name, int port, List<String> servers) {
+ private static ClusterService createService(String name, int port, NetworkAddress... servers) {
var registry = new MessageSerializationRegistryImpl();
- var context = new ClusterLocalConfiguration(name, port, servers, registry);
+ var context = new ClusterLocalConfiguration(name, port, List.of(servers), registry);
var factory = new TestScaleCubeClusterServiceFactory();
return factory.createClusterService(context);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 2dd9fa0..406df62 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.rpc;
import java.util.List;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.NodeManager;
@@ -32,10 +33,15 @@ public class TestIgniteRpcServer extends IgniteRpcServer {
/**
* @param clusterService Cluster service.
* @param servers Server list.
- * @param nodeManager Node manager
+ * @param nodeManager Node manager.
* @param nodeOptions Node options.
*/
- public TestIgniteRpcServer(ClusterService clusterService, List<String> servers, NodeManager nodeManager, NodeOptions nodeOptions) {
+ public TestIgniteRpcServer(
+ ClusterService clusterService,
+ List<NetworkAddress> servers,
+ NodeManager nodeManager,
+ NodeOptions nodeOptions
+ ) {
super(
clusterService,
nodeManager,
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
index cdd63da..9de6606 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft.test;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RpcContext;
@@ -47,11 +48,11 @@ public class MockAsyncContext implements RpcContext {
this.responseObject = responseObject;
}
- @Override public String getRemoteAddress() {
- return "localhost:12345";
+ @Override public NetworkAddress getRemoteAddress() {
+ return new NetworkAddress("localhost", 12345);
}
- @Override public String getLocalAddress() {
- return "localhost:8081";
+ @Override public NetworkAddress getLocalAddress() {
+ return new NetworkAddress("localhost", 8081);
}
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 1e255e6..f966c6b 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -17,11 +17,12 @@
package org.apache.ignite.internal.app;
-import io.netty.util.internal.StringUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
+import io.netty.util.internal.StringUtil;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.Ignition;
import org.apache.ignite.configuration.RootKey;
@@ -48,6 +49,7 @@ import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.utils.IgniteProperties;
@@ -124,12 +126,16 @@ public class IgnitionImpl implements Ignition {
var serializationRegistry = new MessageSerializationRegistryImpl();
+ List<NetworkAddress> peers = Arrays.stream(netConfigurationView.netClusterNodes())
+ .map(NetworkAddress::from)
+ .collect(Collectors.toUnmodifiableList());
+
// Network startup.
ClusterService clusterNetSvc = new ScaleCubeClusterServiceFactory().createClusterService(
new ClusterLocalConfiguration(
nodeName,
netConfigurationView.port(),
- Arrays.asList(netConfigurationView.netClusterNodes()),
+ peers,
serializationRegistry
)
);
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index 41c729f..12abf6b 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -103,6 +103,12 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-network</artifactId>
<scope>test</scope>
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index 1cf219f..e3dd181 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
@@ -111,24 +112,19 @@ public class ITDistributedTableTest {
*/
@BeforeEach
public void beforeTest() {
- for (int i = 0; i < NODES; i++) {
- cluster.add(startClient(
- "node_" + i,
- NODE_PORT_BASE + i,
- IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES).boxed().map((port) -> "localhost:" + port).collect(Collectors.toList())
- ));
- }
+ List<NetworkAddress> allNodes = IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES)
+ .mapToObj(port -> new NetworkAddress("localhost", port))
+ .collect(Collectors.toList());
+
+ for (int i = 0; i < NODES; i++)
+ cluster.add(startClient("node_" + i, NODE_PORT_BASE + i, allNodes));
for (ClusterService node : cluster)
assertTrue(waitForTopology(node, NODES, 1000));
LOG.info("Cluster started.");
- client = startClient(
- "client",
- NODE_PORT_BASE + NODES,
- IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES).boxed().map((port) -> "localhost:" + port).collect(Collectors.toList())
- );
+ client = startClient("client", NODE_PORT_BASE + NODES, allNodes);
assertTrue(waitForTopology(client, NODES + 1, 1000));
@@ -499,7 +495,7 @@ public class ITDistributedTableTest {
* @param servers Server nodes of the cluster.
* @return The client cluster view.
*/
- private ClusterService startClient(String name, int port, List<String> servers) {
+ private ClusterService startClient(String name, int port, List<NetworkAddress> servers) {
var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
var network = NETWORK_FACTORY.createClusterService(context);
network.start();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 1c1abbd..64322d7 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.schema.ColumnType;
import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.SchemaTable;
@@ -63,7 +64,10 @@ import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.platform.commons.util.ReflectionUtils;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -82,6 +86,7 @@ import static org.mockito.Mockito.when;
/**
* Tests scenarios for table manager.
*/
+@ExtendWith(MockitoExtension.class)
public class TableManagerTest {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(TableManagerTest.class);
@@ -102,17 +107,41 @@ public class TableManagerTest {
private static final String DYNAMIC_TABLE_FOR_DROP_NAME = "t3";
/** Table partitions. */
- public static final int PARTITIONS = 32;
-
- /** Node port. */
- public static final int PORT = 2245;
+ private static final int PARTITIONS = 32;
/** Node name. */
- public static final String NODE_NAME = "node1";
+ private static final String NODE_NAME = "node1";
/** Configuration manager. */
private ConfigurationManager cfrMgr;
+ /** MetaStorage manager. */
+ @Mock(lenient = true)
+ private MetaStorageManager mm;
+
+ /** Schema manager. */
+ @Mock(lenient = true)
+ private SchemaManager sm;
+
+ /** Affinity manager. */
+ @Mock(lenient = true)
+ private AffinityManager am;
+
+ /** Raft manager. */
+ @Mock(lenient = true)
+ private Loza rm;
+
+ /** Vault manager. */
+ @Mock(lenient = true)
+ private VaultManager vm;
+
+ /** Test node. */
+ private final ClusterNode node = new ClusterNode(
+ UUID.randomUUID().toString(),
+ NODE_NAME,
+ new NetworkAddress("127.0.0.1", 2245)
+ );
+
/** Before all test scenarios. */
@BeforeEach
private void before() {
@@ -181,12 +210,6 @@ public class TableManagerTest {
@Disabled("https://issues.apache.org/jira/browse/IGNITE-14578")
@Test
public void testStaticTableConfigured() {
- MetaStorageManager mm = mock(MetaStorageManager.class);
- SchemaManager sm = mock(SchemaManager.class);
- AffinityManager am = mock(AffinityManager.class);
- Loza rm = mock(Loza.class);
- VaultManager vm = mock(VaultManager.class);
-
TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm, vm);
assertEquals(1, tableManager.tables().size());
@@ -199,14 +222,6 @@ public class TableManagerTest {
*/
@Test
public void testCreateTable() {
- MetaStorageManager mm = mock(MetaStorageManager.class);
- SchemaManager sm = mock(SchemaManager.class);
- AffinityManager am = mock(AffinityManager.class);
- Loza rm = mock(Loza.class);
- VaultManager vm = mock(VaultManager.class);
-
- ClusterNode node = new ClusterNode(UUID.randomUUID().toString(), NODE_NAME, "127.0.0.1", PORT);
-
CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME).columns(
@@ -214,7 +229,7 @@ public class TableManagerTest {
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
).withPrimaryKey("key").build();
- Table table = mockManagersAndCreateTable(scmTbl, mm, sm, am, rm, vm, node, tblManagerFut);
+ Table table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
assertNotNull(table);
@@ -226,14 +241,6 @@ public class TableManagerTest {
*/
@Test
public void testDropTable() {
- MetaStorageManager mm = mock(MetaStorageManager.class);
- SchemaManager sm = mock(SchemaManager.class);
- AffinityManager am = mock(AffinityManager.class);
- Loza rm = mock(Loza.class);
- VaultManager vm = mock(VaultManager.class);
-
- ClusterNode node = new ClusterNode(UUID.randomUUID().toString(), NODE_NAME, "127.0.0.1", PORT);
-
CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
@@ -241,14 +248,14 @@ public class TableManagerTest {
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
).withPrimaryKey("key").build();
- TableImpl table = mockManagersAndCreateTable(scmTbl, mm, sm, am, rm, vm, node, tblManagerFut);
+ TableImpl table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
TableManager tableManager = tblManagerFut.join();
when(sm.unregisterSchemas(any())).thenReturn(CompletableFuture.completedFuture(true));
- doAnswer(invokation -> {
- EventListener<SchemaEventParameters> schemaInitialized = invokation.getArgument(1);
+ doAnswer(invocation -> {
+ EventListener<SchemaEventParameters> schemaInitialized = invocation.getArgument(1);
SchemaRegistry schemaRegistry = mock(SchemaRegistry.class);
@@ -261,8 +268,8 @@ public class TableManagerTest {
when(am.removeAssignment(any())).thenReturn(CompletableFuture.completedFuture(true));
- doAnswer(invokation -> {
- EventListener<AffinityEventParameters> affinityRemovedDelegate = invokation.getArgument(1);
+ doAnswer(invocation -> {
+ EventListener<AffinityEventParameters> affinityRemovedDelegate = invocation.getArgument(1);
ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
@@ -286,14 +293,6 @@ public class TableManagerTest {
*/
@Test
public void testGetTableDuringCreation() throws Exception {
- MetaStorageManager mm = mock(MetaStorageManager.class);
- SchemaManager sm = mock(SchemaManager.class);
- AffinityManager am = mock(AffinityManager.class);
- Loza rm = mock(Loza.class);
- VaultManager vm = mock(VaultManager.class);
-
- ClusterNode node = new ClusterNode(UUID.randomUUID().toString(), NODE_NAME, "127.0.0.1", PORT);
-
CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
@@ -304,7 +303,7 @@ public class TableManagerTest {
Phaser phaser = new Phaser(2);
CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() ->
- mockManagersAndCreateTableWithDelay(scmTbl, mm, sm, am, rm, vm, node, tblManagerFut, phaser)
+ mockManagersAndCreateTableWithDelay(scmTbl, tblManagerFut, phaser)
);
CompletableFuture<Table> getFut = CompletableFuture.supplyAsync(() -> {
@@ -334,50 +333,26 @@ public class TableManagerTest {
* Instantiates Table manager and creates a table in it.
*
* @param schemaTable Configuration schema for a table.
- * @param mm Metastorage manager mock.
- * @param sm Schema manager mock.
- * @param am Affinity manager mock.
- * @param rm Raft manager mock.
- * @param vm Vault manager mock.
- * @param node This cluster node.
* @param tblManagerFut Future for table manager.
* @return Table.
*/
private TableImpl mockManagersAndCreateTable(
SchemaTable schemaTable,
- MetaStorageManager mm,
- SchemaManager sm,
- AffinityManager am,
- Loza rm,
- VaultManager vm,
- ClusterNode node,
CompletableFuture<TableManager> tblManagerFut
) {
- return mockManagersAndCreateTableWithDelay(schemaTable, mm, sm, am, rm, vm, node, tblManagerFut, null);
+ return mockManagersAndCreateTableWithDelay(schemaTable, tblManagerFut, null);
}
/**
* Instantiates a table and prepares Table manager. When the latch would open, the method completes.
*
* @param schemaTable Configuration schema for a table.
- * @param mm Metastorage manager mock.
- * @param sm Schema manager mock.
- * @param am Affinity manager mock.
- * @param rm Raft manager mock.
- * @param vm Vault manager mock.
- * @param node This cluster node.
* @param tblManagerFut Future for table manager.
- * @param barrier Phaser for the wait.
+ * @param phaser Phaser for the wait.
* @return Table manager.
*/
@NotNull private TableImpl mockManagersAndCreateTableWithDelay(
SchemaTable schemaTable,
- MetaStorageManager mm,
- SchemaManager sm,
- AffinityManager am,
- Loza rm,
- VaultManager vm,
- ClusterNode node,
CompletableFuture<TableManager> tblManagerFut,
Phaser phaser
) {
@@ -385,8 +360,8 @@ public class TableManagerTest {
CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
- when(mm.invoke((Condition)any(), (Operation)any(), (Operation)any())).thenAnswer(invokation -> {
- Condition condition = (Condition)invokation.getArgument(0);
+ when(mm.invoke(any(Condition.class), any(Operation.class), any(Operation.class))).thenAnswer(invocation -> {
+ Condition condition = invocation.getArgument(0);
Object internalCondition = ReflectionUtils.tryToReadFieldValue(Condition.class, "cond", condition).get();
@@ -402,8 +377,8 @@ public class TableManagerTest {
when(sm.initSchemaForTable(any(), eq(schemaTable.canonicalName()))).thenReturn(CompletableFuture.completedFuture(true));
- doAnswer(invokation -> {
- EventListener<SchemaEventParameters> schemaInitialized = invokation.getArgument(1);
+ doAnswer(invocation -> {
+ EventListener<SchemaEventParameters> schemaInitialized = invocation.getArgument(1);
assertTrue(tblIdFut.isDone());
@@ -418,8 +393,8 @@ public class TableManagerTest {
when(am.calculateAssignments(any(), eq(schemaTable.canonicalName()))).thenReturn(CompletableFuture.completedFuture(true));
- doAnswer(invokation -> {
- EventListener<AffinityEventParameters> affinityClaculatedDelegate = invokation.getArgument(1);
+ doAnswer(invocation -> {
+ EventListener<AffinityEventParameters> affinityClaculatedDelegate = invocation.getArgument(1);
ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
@@ -442,7 +417,7 @@ public class TableManagerTest {
when(mm.get(eq(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name"))))
.thenReturn(CompletableFuture.completedFuture(null));
- when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invokation -> {
+ when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
Cursor<Entry> cursor = mock(Cursor.class);
when(cursor.hasNext()).thenReturn(false);
@@ -463,14 +438,14 @@ public class TableManagerTest {
return CompletableFuture.completedFuture(null);
when(mm.get(eq(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name"))))
- .thenAnswer(invokation -> CompletableFuture.completedFuture(createTbl ? mock(Entry.class) : null));
+ .thenAnswer(invocation -> CompletableFuture.completedFuture(createTbl ? mock(Entry.class) : null));
- when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invokation -> {
+ when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
AtomicBoolean firstRecord = new AtomicBoolean(createTbl);
Cursor<Entry> cursor = mock(Cursor.class);
- when(cursor.hasNext()).thenAnswer(hasNextInvokation ->
+ when(cursor.hasNext()).thenAnswer(hasNextInvocation ->
firstRecord.compareAndSet(true, false));
Entry mockEntry = mock(Entry.class);