You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/10 12:19:35 UTC
[ignite-3] 04/04: IGNITE-13885 Raft client wip 2.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit e78980e90edcc6d71e376f55483740371afdf044
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Wed Feb 10 15:19:14 2021 +0300
IGNITE-13885 Raft client wip 2.
---
modules/raft-client/pom.xml | 6 +
.../main/java/org/apache/ignite/raft/Closure.java | 29 ++
.../java/org/apache/ignite/raft/Configuration.java | 276 +++++++++++++
.../org/apache/ignite/raft/ElectionPriority.java | 37 ++
.../main/java/org/apache/ignite/raft/Endpoint.java | 89 +++++
.../java/org/apache/ignite/raft/Lifecycle.java | 39 ++
.../main/java/org/apache/ignite/raft/PeerId.java | 247 ++++++++++++
.../java/org/apache/ignite/raft/RaftError.java | 284 +++++++++++++
.../main/java/org/apache/ignite/raft/Status.java | 225 +++++++++++
.../java/org/apache/ignite/raft/StringUtils.java | 407 +++++++++++++++++++
.../ignite/raft/closure/RpcResponseClosure.java | 34 ++
.../raft/closure/RpcResponseClosureAdapter.java | 37 ++
.../org/apache/ignite/raft/rpc/CliRequests.java | 439 +++++++++++++++++++++
.../org/apache/ignite/raft/rpc/Connection.java | 53 +++
.../raft/rpc/ConnectionClosedEventListener.java | 24 ++
.../org/apache/ignite/raft/rpc/InvokeCallback.java | 30 ++
.../org/apache/ignite/raft/rpc/InvokeContext.java | 51 +++
.../ignite/raft/rpc/InvokeTimeoutException.java | 43 ++
.../java/org/apache/ignite/raft/rpc/Message.java | 11 +
.../ignite/raft/rpc/MessageBuilderFactory.java | 50 +++
.../apache/ignite/raft/rpc/NamedThreadFactory.java | 65 +++
.../apache/ignite/raft/rpc/RemotingException.java | 43 ++
.../java/org/apache/ignite/raft/rpc/RpcClient.java | 105 +++++
.../org/apache/ignite/raft/rpc/RpcContext.java | 44 +++
.../org/apache/ignite/raft/rpc/RpcOptions.java | 25 ++
.../org/apache/ignite/raft/rpc/RpcProcessor.java | 52 +++
.../org/apache/ignite/raft/rpc/RpcRequests.java | 61 +++
.../java/org/apache/ignite/raft/rpc/RpcServer.java | 46 +++
.../java/org/apache/ignite/raft/rpc/RpcUtils.java | 111 ++++++
.../ignite/raft}/rpc/impl/LocalConnection.java | 17 +-
.../ignite/raft/rpc/impl/LocalRpcClient.java | 125 ++++++
.../ignite/raft}/rpc/impl/LocalRpcServer.java | 59 +--
.../raft/rpc/message/AddLearnersRequestImpl.java | 53 +++
.../rpc/message/CreateGetLeaderRequestImpl.java | 37 ++
.../rpc/message/CreateGetLeaderResponseImpl.java | 22 ++
.../rpc/message/DefaultMessageBuilderFactory.java | 86 ++++
.../ignite/raft/rpc/message/ErrorResponseImpl.java | 25 ++
.../raft/rpc/message/LearnersOpResponseImpl.java | 50 +++
.../ignite/raft/rpc/message/PingRequestImpl.java | 21 +
.../rpc/message/RemoveLearnersRequestImpl.java | 53 +++
.../raft/rpc/message/ResetLearnersRequestImpl.java | 53 +++
.../raft/rpc/message/SnapshotRequestImpl.java | 32 ++
.../rpc/message/TransferLeaderRequestImpl.java | 47 +++
.../ignite/raft/service/AbstractClientService.java | 237 +++++++++++
.../ignite/raft/service/CliClientService.java | 153 +++++++
.../ignite/raft/service/CliClientServiceImpl.java | 111 ++++++
.../apache/ignite/raft/service/ClientService.java | 75 ++++
.../org/apache/ignite/raft/service/RouteTable.java | 316 +++++++++++++++
.../org/apache/ignite/raft/RaftClientTest.java | 39 ++
.../org/apache/ignite/raft/rpc/LocalRpcTest.java | 321 +++++++++++++++
.../sofa/jraft/rpc/impl/LocalConnection.java | 2 +-
.../alipay/sofa/jraft/rpc/impl/LocalRpcServer.java | 4 +-
52 files changed, 4847 insertions(+), 54 deletions(-)
diff --git a/modules/raft-client/pom.xml b/modules/raft-client/pom.xml
index 9286853..01e0c6d 100644
--- a/modules/raft-client/pom.xml
+++ b/modules/raft-client/pom.xml
@@ -34,6 +34,12 @@
<artifactId>ignite-raft-client</artifactId>
<dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/Closure.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/Closure.java
new file mode 100644
index 0000000..e2126fc
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/Closure.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+/**
+ * Callback closure.
+ */
+public interface Closure {
+ /**
+ * Called when task is done.
+ *
+ * @param status the execution status.
+ */
+ void run(final Status status);
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/Configuration.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/Configuration.java
new file mode 100644
index 0000000..5a54ce5
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/Configuration.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A configuration with a set of peers and/or learners.
+ */
+public class Configuration implements Iterable<PeerId> {
+ private static final String LEARNER_POSTFIX = "/learner";
+
+ private List<PeerId> peers = new ArrayList<>();
+
+ // use LinkedHashSet to keep insertion order.
+ private LinkedHashSet<PeerId> learners = new LinkedHashSet<>();
+
+ public Configuration() {
+ super();
+ }
+
+ /**
+ * Construct a configuration instance with peers.
+ *
+ * @param conf configuration
+ */
+ public Configuration(final Iterable<PeerId> conf) {
+ this(conf, null);
+ }
+
+ /**
+ * Construct a configuration from another conf.
+ *
+ * @param conf configuration
+ */
+ public Configuration(final Configuration conf) {
+ this(conf.getPeers(), conf.getLearners());
+ }
+
+ /**
+ * Construct a Configuration instance with peers and learners.
+ *
+ * @param conf peers configuration
+ * @param learners learners
+ * @since 1.3.0
+ */
+ public Configuration(final Iterable<PeerId> conf, final Iterable<PeerId> learners) {
+ for (final PeerId peer : conf) {
+ this.peers.add(new PeerId(peer));
+ }
+ addLearners(learners);
+ }
+
+ public void setLearners(final LinkedHashSet<PeerId> learners) {
+ this.learners = learners;
+ }
+
+ /**
+ * Add a learner peer.
+ *
+ * @param learner learner to add
+ * @return true when add successfully.
+ */
+ public boolean addLearner(final PeerId learner) {
+ return this.learners.add(learner);
+ }
+
+ /**
+ * Add learners in batch, returns the added count.
+ *
+ * @param learners learners to add
+ * @return the total added count
+ */
+ public int addLearners(final Iterable<PeerId> learners) {
+ int ret = 0;
+ if (learners != null) {
+ for (final PeerId peer : learners) {
+ if (this.learners.add(new PeerId(peer))) {
+ ret++;
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Remove a learner peer.
+ *
+ * @param learner learner to remove
+ * @return true when remove successfully.
+ */
+ public boolean removeLearner(final PeerId learner) {
+ return this.learners.remove(learner);
+ }
+
+ /**
+ * Retrieve the learners set.
+ *
+ * @return learners
+ */
+ public LinkedHashSet<PeerId> getLearners() {
+ return this.learners;
+ }
+
+ /**
+ * Retrieve the learners set copy.
+ *
+ * @return learners
+ */
+ public List<PeerId> listLearners() {
+ return new ArrayList<>(this.learners);
+ }
+
+ /**
+ * Returns true when the configuration is valid.
+ *
+ * @return true if the configuration is valid.
+ */
+ public boolean isValid() {
+ final Set<PeerId> intersection = new HashSet<>(this.peers);
+ intersection.retainAll(this.learners);
+ return !this.peers.isEmpty() && intersection.isEmpty();
+ }
+
+ public void reset() {
+ this.peers.clear();
+ this.learners.clear();
+ }
+
+ public boolean isEmpty() {
+ return this.peers.isEmpty();
+ }
+
+ /**
+ * Returns the peers total number.
+ *
+ * @return total num of peers
+ */
+ public int size() {
+ return this.peers.size();
+ }
+
+ @Override
+ public Iterator<PeerId> iterator() {
+ return this.peers.iterator();
+ }
+
+ public Set<PeerId> getPeerSet() {
+ return new HashSet<>(this.peers);
+ }
+
+ public List<PeerId> listPeers() {
+ return new ArrayList<>(this.peers);
+ }
+
+ public List<PeerId> getPeers() {
+ return this.peers;
+ }
+
+ public void setPeers(final List<PeerId> peers) {
+ this.peers.clear();
+ for (final PeerId peer : peers) {
+ this.peers.add(new PeerId(peer));
+ }
+ }
+
+ public void appendPeers(final Collection<PeerId> set) {
+ this.peers.addAll(set);
+ }
+
+ public boolean addPeer(final PeerId peer) {
+ return this.peers.add(peer);
+ }
+
+ public boolean removePeer(final PeerId peer) {
+ return this.peers.remove(peer);
+ }
+
+ public boolean contains(final PeerId peer) {
+ return this.peers.contains(peer);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((this.learners == null) ? 0 : this.learners.hashCode());
+ result = prime * result + ((this.peers == null) ? 0 : this.peers.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Configuration other = (Configuration) obj;
+ if (this.learners == null) {
+ if (other.learners != null) {
+ return false;
+ }
+ } else if (!this.learners.equals(other.learners)) {
+ return false;
+ }
+ if (this.peers == null) {
+ return other.peers == null;
+ } else {
+ return this.peers.equals(other.peers);
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ final List<PeerId> peers = listPeers();
+ int i = 0;
+ int size = peers.size();
+ for (final PeerId peer : peers) {
+ sb.append(peer);
+ if (i < size - 1 || !this.learners.isEmpty()) {
+ sb.append(",");
+ }
+ i++;
+ }
+
+ size = this.learners.size();
+ i = 0;
+ for (final PeerId peer : this.learners) {
+ sb.append(peer).append(LEARNER_POSTFIX);
+ if (i < size - 1) {
+ sb.append(",");
+ }
+ i++;
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Get the difference between |*this| and |rhs|
+ * |included| would be assigned to |*this| - |rhs|
+ * |excluded| would be assigned to |rhs| - |*this|
+ */
+ public void diff(final Configuration rhs, final Configuration included, final Configuration excluded) {
+ included.peers = new ArrayList<>(this.peers);
+ included.peers.removeAll(rhs.peers);
+ excluded.peers = new ArrayList<>(rhs.peers);
+ excluded.peers.removeAll(this.peers);
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
new file mode 100644
index 0000000..045fb30
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+/**
+ * Election Priority. TODO asch enum ?
+ */
+public class ElectionPriority {
+ /**
+ * Priority -1 represents this node disabled the priority election function.
+ */
+ public static final int DISABLED = -1;
+
+ /**
+ * Priority 0 is a special value so that a node will never participate in election.
+ */
+ public static final int NOT_ELECTABLE = 0;
+
+ /**
+ * Priority 1 is a minimum value for priority election.
+ */
+ public static final int MIN_VALUE = 1;
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/Endpoint.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/Endpoint.java
new file mode 100644
index 0000000..ae7c4b6
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/Endpoint.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+import java.io.Serializable;
+
+/**
+ * A IP address with port.
+ */
+public class Endpoint implements Serializable {
+ public static final String IP_ANY = "0.0.0.0";
+
+ private static final long serialVersionUID = -7329681263115546100L;
+
+ private String ip = IP_ANY;
+ private int port;
+ private String str;
+
+ public Endpoint() {
+ super();
+ }
+
+ public Endpoint(String address, int port) {
+ super();
+ this.ip = address;
+ this.port = port;
+ }
+
+ public String getIp() {
+ return this.ip;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ @Override
+ public String toString() {
+ if (str == null) {
+ str = this.ip + ":" + this.port;
+ }
+ return str;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (this.ip == null ? 0 : this.ip.hashCode());
+ result = prime * result + this.port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final Endpoint other = (Endpoint) obj;
+ if (this.ip == null) {
+ if (other.ip != null) {
+ return false;
+ }
+ } else if (!this.ip.equals(other.ip)) {
+ return false;
+ }
+ return this.port == other.port;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/Lifecycle.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/Lifecycle.java
new file mode 100644
index 0000000..b6a385d
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/Lifecycle.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+/**
+ * Service life cycle mark interface.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Mar-12 3:47:04 PM
+ */
+public interface Lifecycle<T> {
+
+ /**
+ * Initialize the service.
+ *
+ * @return true when successes.
+ */
+ boolean init(final T opts);
+
+ /**
+ * Dispose the resources for service.
+ */
+ void shutdown();
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
new file mode 100644
index 0000000..c31f3d1
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+import java.io.Serializable;
+
+/**
+ * Represent a participant in a replicating group.
+ */
+public class PeerId implements Serializable {
+ private static final long serialVersionUID = 8083529734784884641L;
+
+ /**
+ * Peer address.
+ */
+ private Endpoint endpoint = new Endpoint(Endpoint.IP_ANY, 0);
+
+ /**
+ * Index in same addr, default is 0.
+ */
+ private int idx; // TODO FIXME idx can be removed.
+ /**
+ * Cached toString result.
+ */
+ private String str;
+
+ /**
+ * Node's local priority value, if node don't support priority election, this value is -1.
+ */
+ private int priority = ElectionPriority.DISABLED;
+
+ public static final PeerId ANY_PEER = new PeerId();
+
+ private long checksum;
+
+ public PeerId(PeerId peer) {
+ this.endpoint = peer.getEndpoint();
+ this.idx = peer.getIdx();
+ this.priority = peer.getPriority();
+ }
+
+ public PeerId() {
+ // No-op.
+ }
+
+ /**
+ * Create an empty peer.
+ *
+ * @return empty peer
+ */
+ public static PeerId emptyPeer() {
+ return new PeerId();
+ }
+
+ public PeerId(final Endpoint endpoint, final int idx) {
+ super();
+ this.endpoint = endpoint;
+ this.idx = idx;
+ }
+
+ public PeerId(final String ip, final int port) {
+ this(ip, port, 0);
+ }
+
+ public PeerId(final String ip, final int port, final int idx) {
+ super();
+ this.endpoint = new Endpoint(ip, port);
+ this.idx = idx;
+ }
+
+ public PeerId(final Endpoint endpoint, final int idx, final int priority) {
+ super();
+ this.endpoint = endpoint;
+ this.idx = idx;
+ this.priority = priority;
+ }
+
+ public PeerId(final String ip, final int port, final int idx, final int priority) {
+ super();
+ this.endpoint = new Endpoint(ip, port);
+ this.idx = idx;
+ this.priority = priority;
+ }
+
+ public Endpoint getEndpoint() {
+ return this.endpoint;
+ }
+
+ public String getIp() {
+ return this.endpoint.getIp();
+ }
+
+ public int getPort() {
+ return this.endpoint.getPort();
+ }
+
+ public int getIdx() {
+ return this.idx;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ this.str = null;
+ }
+
+ /**
+ * Returns true when ip is ANY_IP, port is zero and idx is zero too.
+ */
+ public boolean isEmpty() {
+ return getIp().equals(Endpoint.IP_ANY) && getPort() == 0 && this.idx == 0;
+ }
+
+ @Override
+ public String toString() {
+ if (this.str == null) {
+ final StringBuilder buf = new StringBuilder(this.endpoint.toString());
+
+ if (this.idx != 0) {
+ buf.append(':').append(this.idx);
+ }
+
+ if (this.priority != ElectionPriority.DISABLED) {
+ if (this.idx == 0) {
+ buf.append(':');
+ }
+ buf.append(':').append(this.priority);
+ }
+
+ this.str = buf.toString();
+ }
+ return this.str;
+ }
+
+ /**
+ * To judge whether this node can participate in election or not.
+ *
+ * @return the restul that whether this node can participate in election or not.
+ */
+ public boolean isPriorityNotElected() {
+ return this.priority == ElectionPriority.NOT_ELECTABLE;
+ }
+
+ /**
+ * To judge whether the priority election function is disabled or not in this node.
+ *
+ * @return the result that whether this node has priority election function or not.
+ */
+ public boolean isPriorityDisabled() {
+ return this.priority <= ElectionPriority.DISABLED;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (this.endpoint == null ? 0 : this.endpoint.hashCode());
+ result = prime * result + this.idx;
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final PeerId other = (PeerId) obj;
+ if (this.endpoint == null) {
+ if (other.endpoint != null) {
+ return false;
+ }
+ } else if (!this.endpoint.equals(other.endpoint)) {
+ return false;
+ }
+ return this.idx == other.idx;
+ }
+
+ /**
+ * Parse peerId from string that generated by {@link #toString()}
+ * This method can support parameter string values are below:
+ *
+ * <pre>
+ * PeerId.parse("a:b") = new PeerId("a", "b", 0 , -1)
+ * PeerId.parse("a:b:c") = new PeerId("a", "b", "c", -1)
+ * PeerId.parse("a:b::d") = new PeerId("a", "b", 0, "d")
+ * PeerId.parse("a:b:c:d") = new PeerId("a", "b", "c", "d")
+ * </pre>
+ */
+ public boolean parse(final String s) {
+ if (StringUtils.isBlank(s)) {
+ return false;
+ }
+
+ final String[] tmps = StringUtils.parsePeerId(s);
+ if (tmps.length < 2 || tmps.length > 4) {
+ return false;
+ }
+ try {
+ final int port = Integer.parseInt(tmps[1]);
+ this.endpoint = new Endpoint(tmps[0], port);
+
+ switch (tmps.length) {
+ case 3:
+ this.idx = Integer.parseInt(tmps[2]);
+ break;
+ case 4:
+ if ("".equals(tmps[2])) {
+ this.idx = 0;
+ } else {
+ this.idx = Integer.parseInt(tmps[2]);
+ }
+ this.priority = Integer.parseInt(tmps[3]);
+ break;
+ default:
+ break;
+ }
+ this.str = null;
+ return true;
+ } catch (final Exception e) {
+ // LOG.error("Parse peer from string failed: {}.", s, e);
+ return false;
+ }
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftError.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftError.java
new file mode 100644
index 0000000..a9635fc
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftError.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Raft error code.
+ */
+public enum RaftError {
+
+ /**
+ * Unknown error
+ */
+ UNKNOWN(-1),
+
+ /**
+ * Success, no error.
+ */
+ SUCCESS(0),
+
+ /**
+ * <pre>
+ * All Kinds of Timeout(Including Election_timeout, Timeout_now, Stepdown_timeout)
+ * </pre>
+ * <p>
+ * <code>ERAFTTIMEDOUT = 10001;</code>
+ */
+ ERAFTTIMEDOUT(10001),
+
+ /**
+ * <pre>
+ * Bad User State Machine
+ * </pre>
+ * <p>
+ * <code>ESTATEMACHINE = 10002;</code>
+ */
+ ESTATEMACHINE(10002),
+
+ /**
+ * <pre>
+ * Catchup Failed
+ * </pre>
+ * <p>
+ * <code>ECATCHUP = 10003;</code>
+ */
+ ECATCHUP(10003),
+
+ /**
+ * <pre>
+ * Trigger step_down(Not All)
+ * </pre>
+ * <p>
+ * <code>ELEADERREMOVED = 10004;</code>
+ */
+ ELEADERREMOVED(10004),
+
+ /**
+ * <pre>
+ * Leader Is Not In The New Configuration
+ * </pre>
+ * <p>
+ * <code>ESETPEER = 10005;</code>
+ */
+ ESETPEER(10005),
+
+ /**
+ * <pre>
+ * Shut_down
+ * </pre>
+ * <p>
+ * <code>ENODESHUTDOWN = 10006;</code>
+ */
+ ENODESHUTDOWN(10006),
+
+ /**
+ * <pre>
+ * Receive Higher Term Requests
+ * </pre>
+ * <p>
+ * <code>EHIGHERTERMREQUEST = 10007;</code>
+ */
+ EHIGHERTERMREQUEST(10007),
+
+ /**
+ * <pre>
+ * Receive Higher Term Response
+ * </pre>
+ * <p>
+ * <code>EHIGHERTERMRESPONSE = 10008;</code>
+ */
+ EHIGHERTERMRESPONSE(10008),
+
+ /**
+ * <pre>
+ * Node Is In Error
+ * </pre>
+ * <p>
+ * <code>EBADNODE = 10009;</code>
+ */
+ EBADNODE(10009),
+
+ /**
+ * <pre>
+ * Node Votes For Some Candidate
+ * </pre>
+ * <p>
+ * <code>EVOTEFORCANDIDATE = 10010;</code>
+ */
+ EVOTEFORCANDIDATE(10010),
+
+ /**
+ * <pre>
+ * Follower(without leader) or Candidate Receives
+ * </pre>
+ * <p>
+ * <code>ENEWLEADER = 10011;</code>
+ */
+ ENEWLEADER(10011),
+
+ /**
+ * <pre>
+ * Append_entries/Install_snapshot Request from a new leader
+ * </pre>
+ * <p>
+ * <code>ELEADERCONFLICT = 10012;</code>
+ */
+ ELEADERCONFLICT(10012),
+
+ /**
+ * <pre>
+ * Trigger on_leader_stop
+ * </pre>
+ * <p>
+ * <code>ETRANSFERLEADERSHIP = 10013;</code>
+ */
+ ETRANSFERLEADERSHIP(10013),
+
+ /**
+ * <pre>
+ * The log at the given index is deleted
+ * </pre>
+ * <p>
+ * <code>ELOGDELETED = 10014;</code>
+ */
+ ELOGDELETED(10014),
+
+ /**
+ * <pre>
+ * No available user log to read
+ * </pre>
+ * <p>
+ * <code>ENOMOREUSERLOG = 10015;</code>
+ */
+ ENOMOREUSERLOG(10015),
+
+ /* other non-raft error codes 1000~10000 */
+ /**
+ * Invalid rpc request
+ */
+ EREQUEST(1000),
+
+ /**
+ * Task is stopped
+ */
+ ESTOP(1001),
+
+ /**
+ * Retry again
+ */
+ EAGAIN(1002),
+
+ /**
+ * Interrupted
+ */
+ EINTR(1003),
+
+ /**
+ * Internal exception
+ */
+ EINTERNAL(1004),
+
+ /**
+ * Task is canceled
+ */
+ ECANCELED(1005),
+
+ /**
+ * Host is down
+ */
+ EHOSTDOWN(1006),
+
+ /**
+ * Service is shutdown
+ */
+ ESHUTDOWN(1007),
+
+ /**
+ * Permission issue
+ */
+ EPERM(1008),
+
+ /**
+ * Server is in busy state
+ */
+ EBUSY(1009),
+
+ /**
+ * Timed out
+ */
+ ETIMEDOUT(1010),
+
+ /**
+ * Data is stale
+ */
+ ESTALE(1011),
+
+ /**
+ * Something not found
+ */
+ ENOENT(1012),
+
+ /**
+ * File/folder already exists
+ */
+ EEXISTS(1013),
+
+ /**
+ * IO error
+ */
+ EIO(1014),
+
+ /**
+ * Invalid value.
+ */
+ EINVAL(1015),
+
+ /**
+ * Permission denied
+ */
+ EACCES(1016);
+
+ private static final Map<Integer, RaftError> RAFT_ERROR_MAP = new HashMap<>();
+
+ static {
+ for (final RaftError error : RaftError.values()) {
+ RAFT_ERROR_MAP.put(error.getNumber(), error);
+ }
+ }
+
+ public final int getNumber() {
+ return this.value;
+ }
+
+ public static RaftError forNumber(final int value) {
+ return RAFT_ERROR_MAP.getOrDefault(value, UNKNOWN);
+ }
+
+ public static String describeCode(final int code) {
+ RaftError e = forNumber(code);
+ return e != null ? e.name() : "<Unknown:" + code + ">";
+ }
+
+ private final int value;
+
+ RaftError(final int value) {
+ this.value = value;
+ }
+}
\ No newline at end of file
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/Status.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/Status.java
new file mode 100644
index 0000000..82a9aa4
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/Status.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+//A Status encapsulates the result of an operation. It may indicate success,
+
+//or it may indicate an error with an associated error message. It's suitable
+//for passing status of functions with richer information than just error_code
+//in exception-forbidden code. This utility is inspired by leveldb::Status.
+//
+//Multiple threads can invoke const methods on a Status without
+//external synchronization, but if any of the threads may call a
+//non-const method, all threads accessing the same Status must use
+//external synchronization.
+//
+//Since failed status needs to allocate memory, you should be careful when
+//failed status is frequent.
+public class Status {
+
+ /**
+ * Status internal state.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Apr-03 11:17:51 AM
+ */
+ private static class State {
+ /** error code */
+ int code;
+ /** error msg*/
+ String msg;
+
+ State(int code, String msg) {
+ super();
+ this.code = code;
+ this.msg = msg;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + this.code;
+ result = prime * result + (this.msg == null ? 0 : this.msg.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ State other = (State) obj;
+ if (this.code != other.code) {
+ return false;
+ }
+ if (this.msg == null) {
+ return other.msg == null;
+ } else {
+ return this.msg.equals(other.msg);
+ }
+ }
+ }
+
+ private State state;
+
+ public Status() {
+ this.state = null;
+ }
+
+ /**
+ * Creates a OK status instance.
+ */
+ public static Status OK() {
+ return new Status();
+ }
+
+ public Status(Status s) {
+ if (s.state != null) {
+ this.state = new State(s.state.code, s.state.msg);
+ } else {
+ this.state = null;
+ }
+ }
+
+ public Status(RaftError raftError, String fmt, Object... args) {
+ this.state = new State(raftError.getNumber(), String.format(fmt, args));
+ }
+
+ public Status(int code, String fmt, Object... args) {
+ this.state = new State(code, String.format(fmt, args));
+ }
+
+ public Status(int code, String errorMsg) {
+ this.state = new State(code, errorMsg);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (this.state == null ? 0 : this.state.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Status other = (Status) obj;
+ if (this.state == null) {
+ return other.state == null;
+ } else {
+ return this.state.equals(other.state);
+ }
+ }
+
+ /**
+ * Reset status to be OK state.
+ */
+ public void reset() {
+ this.state = null;
+ }
+
+ /**
+ * Returns true when status is in OK state.
+ */
+ public boolean isOk() {
+ return this.state == null || this.state.code == 0;
+ }
+
+ /**
+ * Set error code.
+ */
+ public void setCode(int code) {
+ if (this.state == null) {
+ this.state = new State(code, null);
+ } else {
+ this.state.code = code;
+ }
+ }
+
+ /**
+ * Get error code.
+ */
+ public int getCode() {
+ return this.state == null ? 0 : this.state.code;
+ }
+
+ /**
+ * Get raft error from error code.
+ */
+ public RaftError getRaftError() {
+ return this.state == null ? RaftError.SUCCESS : RaftError.forNumber(this.state.code);
+ }
+
+ /**
+ * Set error msg
+ */
+ public void setErrorMsg(String errMsg) {
+ if (this.state == null) {
+ this.state = new State(0, errMsg);
+ } else {
+ this.state.msg = errMsg;
+ }
+ }
+
+ /**
+ * Set error code and error msg.
+ */
+ public void setError(int code, String fmt, Object... args) {
+ this.state = new State(code, String.format(String.valueOf(fmt), args));
+ }
+
+ /**
+ * Set raft error and error msg.
+ */
+ public void setError(RaftError error, String fmt, Object... args) {
+ this.state = new State(error.getNumber(), String.format(String.valueOf(fmt), args));
+ }
+
+ @Override
+ public String toString() {
+ if (isOk()) {
+ return "Status[OK]";
+ } else {
+ return "Status[" + RaftError.describeCode(this.state.code) + "<" + this.state.code + ">: " + this.state.msg
+ + "]";
+ }
+ }
+
+ /**
+ * Get the error msg.
+ */
+ public String getErrorMsg() {
+ return this.state == null ? null : this.state.msg;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/StringUtils.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/StringUtils.java
new file mode 100644
index 0000000..8609f44
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/StringUtils.java
@@ -0,0 +1,407 @@
+package org.apache.ignite.raft;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TODO asch Currently used only for parsing peer ids.
+ */
+public class StringUtils {
+ public static final String IPV6_START_MARK = "[";
+
+ public static final String IPV6_END_MARK = "]";
+
+ private static final int IPV6_ADDRESS_LENGTH = 16;
+
+ public static final String EMPTY = "";
+ public static final String[] EMPTY_STRING_ARRAY = new String[0];
+
+ public StringUtils() {
+ }
+
+ public static boolean isEmpty(CharSequence cs) {
+ return cs == null || cs.length() == 0;
+ }
+
+ public static boolean isNotEmpty(CharSequence cs) {
+ return !isEmpty(cs);
+ }
+
+ public static boolean isBlank(CharSequence cs) {
+ int strLen;
+ if (cs != null && (strLen = cs.length()) != 0) {
+ for(int i = 0; i < strLen; ++i) {
+ if (!Character.isWhitespace(cs.charAt(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ } else {
+ return true;
+ }
+ }
+
+ public static boolean isNotBlank(CharSequence cs) {
+ return !isBlank(cs);
+ }
+
+ public static String[] split(String str, char separatorChar) {
+ return splitWorker(str, separatorChar, false);
+ }
+
+ private static String[] splitWorker(String str, char separatorChar, boolean preserveAllTokens) {
+ if (str == null) {
+ return null;
+ } else {
+ int len = str.length();
+ if (len == 0) {
+ return EMPTY_STRING_ARRAY;
+ } else {
+ List<String> list = new ArrayList();
+ int i = 0;
+ int start = 0;
+ boolean match = false;
+ boolean lastMatch = false;
+
+ while(true) {
+ while(i < len) {
+ if (str.charAt(i) == separatorChar) {
+ if (match || preserveAllTokens) {
+ list.add(str.substring(start, i));
+ match = false;
+ lastMatch = true;
+ }
+
+ ++i;
+ start = i;
+ } else {
+ lastMatch = false;
+ match = true;
+ ++i;
+ }
+ }
+
+ if (match || preserveAllTokens && lastMatch) {
+ list.add(str.substring(start, i));
+ }
+
+ return (String[])list.toArray(new String[list.size()]);
+ }
+ }
+ }
+ }
+
+ public static boolean isNumeric(String str) {
+ if (str == null) {
+ return false;
+ } else {
+ int sz = str.length();
+
+ for(int i = 0; i < sz; ++i) {
+ if (!Character.isDigit(str.charAt(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }
+
+ public static boolean equals(String str1, String str2) {
+ return str1 == null ? str2 == null : str1.equals(str2);
+ }
+
+ /**
+ * <p>Splits the provided text into an array with a maximum length,
+ * separators specified, preserving all tokens, including empty tokens
+ * created by adjacent separators.</p>
+ *
+ * <p>The separator is not included in the returned String array.
+ * Adjacent separators are treated as separators for empty tokens.
+ * Adjacent separators are treated as one separator.</p>
+ *
+ * <p>A <code>null</code> input String returns <code>null</code>.
+ * A <code>null</code> separatorChars splits on whitespace.</p>
+ *
+ * <p>If more than <code>max</code> delimited substrings are found, the last
+ * returned string includes all characters after the first <code>max - 1</code>
+ * returned strings (including separator characters).</p>
+ *
+ * <pre>
+ * StringUtils.splitPreserveAllTokens(null, *, *) = null
+ * StringUtils.splitPreserveAllTokens("", *, *) = []
+ * StringUtils.splitPreserveAllTokens("ab de fg", null, 0) = ["ab", "cd", "ef"]
+ * StringUtils.splitPreserveAllTokens("ab de fg", null, 0) = ["ab", "cd", "ef"]
+ * StringUtils.splitPreserveAllTokens("ab:cd:ef", ":", 0) = ["ab", "cd", "ef"]
+ * StringUtils.splitPreserveAllTokens("ab:cd:ef", ":", 2) = ["ab", "cd:ef"]
+ * StringUtils.splitPreserveAllTokens("ab de fg", null, 2) = ["ab", " de fg"]
+ * StringUtils.splitPreserveAllTokens("ab de fg", null, 3) = ["ab", "", " de fg"]
+ * StringUtils.splitPreserveAllTokens("ab de fg", null, 4) = ["ab", "", "", "de fg"]
+ * </pre>
+ *
+ * @param str the String to parse, may be <code>null</code>
+ * @param separatorChars the characters used as the delimiters,
+ * <code>null</code> splits on whitespace
+ * @param max the maximum number of elements to include in the
+ * array. A zero or negative value implies no limit
+ * @return an array of parsed Strings, <code>null</code> if null String input
+ * @since 2.1
+ */
+ public static String[] splitPreserveAllTokens(String str, String separatorChars, int max) {
+ return splitWorker(str, separatorChars, max, true);
+ }
+
+ /**
+ * Performs the logic for the <code>split</code> and
+ * <code>splitPreserveAllTokens</code> methods that return a maximum array
+ * length.
+ *
+ * @param str the String to parse, may be <code>null</code>
+ * @param separatorChars the separate character
+ * @param max the maximum number of elements to include in the
+ * array. A zero or negative value implies no limit.
+ * @param preserveAllTokens if <code>true</code>, adjacent separators are
+ * treated as empty token separators; if <code>false</code>, adjacent
+ * separators are treated as one separator.
+ * @return an array of parsed Strings, <code>null</code> if null String input
+ */
+ private static String[] splitWorker(String str, String separatorChars, int max, boolean preserveAllTokens) {
+ // Performance tuned for 2.0 (JDK1.4)
+ // Direct code is quicker than StringTokenizer.
+ // Also, StringTokenizer uses isSpace() not isWhitespace()
+
+ if (str == null) {
+ return null;
+ }
+ int len = str.length();
+ if (len == 0) {
+ return EMPTY_STRING_ARRAY;
+ }
+ List list = new ArrayList();
+ int sizePlus1 = 1;
+ int i = 0, start = 0;
+ boolean match = false;
+ boolean lastMatch = false;
+ if (separatorChars == null) {
+ // Null separator means use whitespace
+ while (i < len) {
+ if (Character.isWhitespace(str.charAt(i))) {
+ if (match || preserveAllTokens) {
+ lastMatch = true;
+ if (sizePlus1++ == max) {
+ i = len;
+ lastMatch = false;
+ }
+ list.add(str.substring(start, i));
+ match = false;
+ }
+ start = ++i;
+ continue;
+ }
+ lastMatch = false;
+ match = true;
+ i++;
+ }
+ } else if (separatorChars.length() == 1) {
+ // Optimise 1 character case
+ char sep = separatorChars.charAt(0);
+ while (i < len) {
+ if (str.charAt(i) == sep) {
+ if (match || preserveAllTokens) {
+ lastMatch = true;
+ if (sizePlus1++ == max) {
+ i = len;
+ lastMatch = false;
+ }
+ list.add(str.substring(start, i));
+ match = false;
+ }
+ start = ++i;
+ continue;
+ }
+ lastMatch = false;
+ match = true;
+ i++;
+ }
+ } else {
+ // standard case
+ while (i < len) {
+ if (separatorChars.indexOf(str.charAt(i)) >= 0) {
+ if (match || preserveAllTokens) {
+ lastMatch = true;
+ if (sizePlus1++ == max) {
+ i = len;
+ lastMatch = false;
+ }
+ list.add(str.substring(start, i));
+ match = false;
+ }
+ start = ++i;
+ continue;
+ }
+ lastMatch = false;
+ match = true;
+ i++;
+ }
+ }
+ if (match || (preserveAllTokens && lastMatch)) {
+ list.add(str.substring(start, i));
+ }
+ return (String[]) list.toArray(new String[list.size()]);
+ }
+
+ /**
+ * <p>Checks if String contains a search String irrespective of case,
+ * handling <code>null</code>. Case-insensitivity is defined as by
+ * {@link String#equalsIgnoreCase(String)}.
+ *
+ * <p>A <code>null</code> String will return <code>false</code>.</p>
+ *
+ * <pre>
+ * StringUtils.contains(null, *) = false
+ * StringUtils.contains(*, null) = false
+ * StringUtils.contains("", "") = true
+ * StringUtils.contains("abc", "") = true
+ * StringUtils.contains("abc", "a") = true
+ * StringUtils.contains("abc", "z") = false
+ * StringUtils.contains("abc", "A") = true
+ * StringUtils.contains("abc", "Z") = false
+ * </pre>
+ *
+ * @param str the String to check, may be null
+ * @param searchStr the String to find, may be null
+ * @return true if the String contains the search String irrespective of
+ * case or false if not or <code>null</code> string input
+ */
+ public static boolean containsIgnoreCase(String str, String searchStr) {
+ if (str == null || searchStr == null) {
+ return false;
+ }
+ int len = searchStr.length();
+ int max = str.length() - len;
+ for (int i = 0; i <= max; i++) {
+ if (str.regionMatches(true, i, searchStr, 0, len)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // -----------------------------------------------------------------------
+ /**
+ * <p>Splits the provided text into an array, using whitespace as the
+ * separator, preserving all tokens, including empty tokens created by
+ * adjacent separators. This is an alternative to using StringTokenizer.
+ * Whitespace is defined by {@link Character#isWhitespace(char)}.</p>
+ *
+ * <p>The separator is not included in the returned String array.
+ * Adjacent separators are treated as separators for empty tokens.
+ * For more control over the split use the StrTokenizer class.</p>
+ *
+ * <p>A <code>null</code> input String returns <code>null</code>.</p>
+ *
+ * <pre>
+ * StringUtils.splitPreserveAllTokens(null) = null
+ * StringUtils.splitPreserveAllTokens("") = []
+ * StringUtils.splitPreserveAllTokens("abc def") = ["abc", "def"]
+ * StringUtils.splitPreserveAllTokens("abc def") = ["abc", "", "def"]
+ * StringUtils.splitPreserveAllTokens(" abc ") = ["", "abc", ""]
+ * </pre>
+ *
+ * @param str the String to parse, may be <code>null</code>
+ * @return an array of parsed Strings, <code>null</code> if null String input
+ * @since 2.1
+ */
+ public static String[] splitPreserveAllTokens(String str) {
+ return splitWorker(str, null, -1, true);
+ }
+
+ /**
+ * <p>Splits the provided text into an array, separator specified,
+ * preserving all tokens, including empty tokens created by adjacent
+ * separators. This is an alternative to using StringTokenizer.</p>
+ *
+ * <p>The separator is not included in the returned String array.
+ * Adjacent separators are treated as separators for empty tokens.
+ * For more control over the split use the StrTokenizer class.</p>
+ *
+ * <p>A <code>null</code> input String returns <code>null</code>.</p>
+ *
+ * <pre>
+ * StringUtils.splitPreserveAllTokens(null, *) = null
+ * StringUtils.splitPreserveAllTokens("", *) = []
+ * StringUtils.splitPreserveAllTokens("a.b.c", '.') = ["a", "b", "c"]
+ * StringUtils.splitPreserveAllTokens("a..b.c", '.') = ["a", "", "b", "c"]
+ * StringUtils.splitPreserveAllTokens("a:b:c", '.') = ["a:b:c"]
+ * StringUtils.splitPreserveAllTokens("a\tb\nc", null) = ["a", "b", "c"]
+ * StringUtils.splitPreserveAllTokens("a b c", ' ') = ["a", "b", "c"]
+ * StringUtils.splitPreserveAllTokens("a b c ", ' ') = ["a", "b", "c", ""]
+ * StringUtils.splitPreserveAllTokens("a b c ", ' ') = ["a", "b", "c", "", ""]
+ * StringUtils.splitPreserveAllTokens(" a b c", ' ') = ["", a", "b", "c"]
+ * StringUtils.splitPreserveAllTokens(" a b c", ' ') = ["", "", a", "b", "c"]
+ * StringUtils.splitPreserveAllTokens(" a b c ", ' ') = ["", a", "b", "c", ""]
+ * </pre>
+ *
+ * @param str the String to parse, may be <code>null</code>
+ * @param separatorChar the character used as the delimiter,
+ * <code>null</code> splits on whitespace
+ * @return an array of parsed Strings, <code>null</code> if null String input
+ * @since 2.1
+ */
+ public static String[] splitPreserveAllTokens(String str, char separatorChar) {
+ return splitWorker(str, separatorChar, true);
+ }
+
+ /**
+ * Parse peerId from string that generated by {@link #toString()}
+ * This method can support parameter string values are below:
+ *
+ * <pre>
+ * PeerId.parse("a:b") = new PeerId("a", "b", 0 , -1)
+ * PeerId.parse("a:b:c") = new PeerId("a", "b", "c", -1)
+ * PeerId.parse("a:b::d") = new PeerId("a", "b", 0, "d")
+ * PeerId.parse("a:b:c:d") = new PeerId("a", "b", "c", "d")
+ * </pre>
+ *
+ */
+ public static String[] parsePeerId(String s) {
+ if (s.startsWith(IPV6_START_MARK) && StringUtils.containsIgnoreCase(s, IPV6_END_MARK)) {
+ String ipv6Addr;
+ if (s.endsWith(IPV6_END_MARK)) {
+ ipv6Addr = s;
+ } else {
+ ipv6Addr = s.substring(0, (s.indexOf(IPV6_END_MARK) + 1));
+ }
+ if (!isIPv6(ipv6Addr)) {
+ throw new IllegalArgumentException("The IPv6 address(\"" + ipv6Addr + "\") is incorrect.");
+ }
+ String tempString = s.substring((s.indexOf(ipv6Addr) + ipv6Addr.length()));
+ if (tempString.startsWith(":")) {
+ tempString = tempString.substring(1);
+ }
+ String[] tempArr = StringUtils.splitPreserveAllTokens(tempString, ':');
+ String[] result = new String[1 + tempArr.length];
+ result[0] = ipv6Addr;
+ System.arraycopy(tempArr, 0, result, 1, tempArr.length);
+ return result;
+ } else {
+ return StringUtils.splitPreserveAllTokens(s, ':');
+ }
+ }
+
+ /**
+ * check whether the ip address is IPv6.
+ *
+ * @param addr ip address
+ * @return boolean
+ */
+ private static boolean isIPv6(String addr) {
+ try {
+ return InetAddress.getByName(addr).getAddress().length == IPV6_ADDRESS_LENGTH;
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosure.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosure.java
new file mode 100644
index 0000000..d7f9b20
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosure.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.closure;
+
+import org.apache.ignite.raft.Closure;
+import org.apache.ignite.raft.rpc.Message;
+
+/**
+ * RPC response closure.
+
+ * @param <T>
+ */
+public interface RpcResponseClosure<T extends Message> extends Closure {
+ /**
+ * Called by request handler to set response.
+ *
+ * @param resp rpc response
+ */
+ void setResponse(T resp);
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosureAdapter.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosureAdapter.java
new file mode 100644
index 0000000..e3dfbf0
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosureAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.closure;
+
+import org.apache.ignite.raft.rpc.Message;
+
+/**
+ * RpcResponseClosure adapter holds the response.
+
+ * @param <T>
+ */
+public abstract class RpcResponseClosureAdapter<T extends Message> implements RpcResponseClosure<T> {
+ private T resp;
+
+ public T getResponse() {
+ return this.resp;
+ }
+
+ @Override
+ public void setResponse(T resp) {
+ this.resp = resp;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/CliRequests.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/CliRequests.java
new file mode 100644
index 0000000..8a3fa47
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/CliRequests.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: cli.proto
+
+package org.apache.ignite.raft.rpc;
+
+public final class CliRequests {
+ private CliRequests() {
+ }
+
+ public interface AddPeerRequest extends Message {
+ String getGroupId();
+
+ String getLeaderId();
+
+ String getPeerId();
+
+ interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setLeaderId(String leaderId);
+
+ Builder setPeerId(String peerId);
+
+ AddPeerRequest build();
+ }
+
+ public static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createAddPeerRequest();
+ }
+ }
+
+ public interface AddPeerResponse extends Message {
+ static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createAddPeerResponse();
+ }
+
+ java.util.List<String> getOldPeersList();
+
+ int getOldPeersCount();
+
+ String getOldPeers(int index);
+
+ java.util.List<String> getNewPeersList();
+
+ int getNewPeersCount();
+
+ String getNewPeers(int index);
+
+ public interface Builder {
+ Builder addOldPeers(String oldPeersId);
+
+ Builder addNewPeers(String newPeersId);
+
+ AddPeerResponse build();
+ }
+ }
+
+ public interface RemovePeerRequest extends Message {
+ String getGroupId();
+
+ String getLeaderId();
+
+ String getPeerId();
+
+ interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setLeaderId(String leaderId);
+
+ Builder setPeerId(String peerId);
+
+ RemovePeerRequest build();
+ }
+
+ public static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createRemovePeerRequest();
+ }
+ }
+
+ public interface RemovePeerResponse extends Message {
+ static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createRemovePeerResponse();
+ }
+
+ java.util.List<String> getOldPeersList();
+
+ int getOldPeersCount();
+
+ String getOldPeers(int index);
+
+ java.util.List<String> getNewPeersList();
+
+ int getNewPeersCount();
+
+ String getNewPeers(int index);
+
+ public interface Builder {
+ Builder addOldPeers(String oldPeerId);
+
+ Builder addNewPeers(String newPeerId);
+
+ RemovePeerResponse build();
+ }
+ }
+
+ public interface ChangePeersRequest extends Message {
+ String getGroupId();
+
+ String getLeaderId();
+
+ java.util.List<String> getNewPeersList();
+
+ int getNewPeersCount();
+
+ String getNewPeers(int index);
+
+ public interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setLeaderId(String leaderId);
+
+ Builder addNewPeers(String peerId);
+
+ ChangePeersRequest build();
+ }
+
+ public static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createChangePeerRequest();
+ }
+ }
+
+ public interface ChangePeersResponse extends Message {
+ static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createChangePeerResponse();
+ }
+
+ java.util.List<String> getOldPeersList();
+
+ int getOldPeersCount();
+
+ String getOldPeers(int index);
+
+ java.util.List<String> getNewPeersList();
+
+ int getNewPeersCount();
+
+ String getNewPeers(int index);
+
+ public interface Builder {
+ Builder addOldPeers(String oldPeerId);
+
+ Builder addNewPeers(String newPeerId);
+
+ ChangePeersResponse build();
+ }
+ }
+
+ public interface SnapshotRequest extends Message {
+ String getGroupId();
+
+ String getPeerId();
+
+ public interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setPeerId(String peerId);
+
+ SnapshotRequest build();
+ }
+
+ static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createSnapshotRequest();
+ }
+ }
+
+ public interface ResetPeerRequest extends Message {
+ String getGroupId();
+
+ String getPeerId();
+
+ java.util.List<String> getOldPeersList();
+
+ int getOldPeersCount();
+
+ String getOldPeers(int index);
+
+ java.util.List<String> getNewPeersList();
+
+ int getNewPeersCount();
+
+ String getNewPeers(int index);
+
+ public interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setPeerId(String peerId);
+
+ Builder addNewPeers(String peerId);
+
+ ResetPeerRequest build();
+ }
+
+ public static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createResetPeerRequest();
+ }
+ }
+
+ public interface TransferLeaderRequest extends Message {
+ String getGroupId();
+
+ String getLeaderId();
+
+ String getPeerId();
+
+ boolean hasPeerId();
+
+ public interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setLeaderId(String leaderId);
+
+ Builder setPeerId(String peerId);
+
+ TransferLeaderRequest build();
+ }
+
+ public static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createTransferLeaderRequest();
+ }
+ }
+
+ public interface GetLeaderRequest extends Message {
+ String getGroupId();
+
+ String getPeerId();
+
+ boolean hasPeerId();
+
+ public interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setPeerId(String peerId);
+
+ GetLeaderRequest build();
+ }
+
+ public static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createGetLeaderRequest();
+ }
+ }
+
+ public interface GetLeaderResponse extends Message {
+ static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createGetLeaderResponse();
+ }
+
+ String getLeaderId();
+
+ public interface Builder {
+ GetLeaderResponse build();
+
+ Builder setLeaderId(String leaderId);
+ }
+ }
+
+ public interface GetPeersRequest extends Message {
+ String getGroupId();
+
+ String getLeaderId();
+
+ boolean getOnlyAlive();
+
+ public interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setLeaderId(String leaderId);
+
+ Builder setOnlyAlive(boolean onlyGetAlive);
+
+ GetPeersRequest build();
+ }
+
+ public static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createGetPeersRequest();
+ }
+ }
+
+ public interface GetPeersResponse extends Message {
+ static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createGetPeersResponse();
+ }
+
+ java.util.List<String> getPeersList();
+
+ int getPeersCount();
+
+ String getPeers(int index);
+
+ java.util.List<String> getLearnersList();
+
+ int getLearnersCount();
+
+ String getLearners(int index);
+
+ public interface Builder {
+ Builder addPeers(String peerId);
+
+ Builder addLearners(String learnerId);
+
+ GetPeersResponse build();
+ }
+ }
+
+ public interface AddLearnersRequest extends Message {
+ String getGroupId();
+
+ String getLeaderId();
+
+ java.util.List<String> getLearnersList();
+
+ int getLearnersCount();
+
+ String getLearners(int index);
+
+ public interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setLeaderId(String leaderId);
+
+ Builder addLearners(String learnerId);
+
+ AddLearnersRequest build();
+ }
+
+ static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createAddLearnersRequest();
+ }
+ }
+
+ public interface RemoveLearnersRequest extends Message {
+ String getGroupId();
+
+ String getLeaderId();
+
+ java.util.List<String> getLearnersList();
+
+ int getLearnersCount();
+
+ String getLearners(int index);
+
+ public interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setLeaderId(String leaderId);
+
+ Builder addLearners(String leaderId);
+
+ RemoveLearnersRequest build();
+ }
+
+ static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createRemoveLearnersRequest();
+ }
+ }
+
+ public interface ResetLearnersRequest extends Message {
+ String getGroupId();
+
+ String getLeaderId();
+
+ java.util.List<String> getLearnersList();
+
+ /**
+ * <code>repeated string learners = 3;</code>
+ */
+ int getLearnersCount();
+
+ /**
+ * <code>repeated string learners = 3;</code>
+ */
+ String getLearners(int index);
+
+ public interface Builder {
+ Builder setGroupId(String groupId);
+
+ Builder setLeaderId(String leaderId);
+
+ Builder addLearners(String learnerId);
+
+ ResetLearnersRequest build();
+ }
+
+ static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createResetLearnersRequest();
+ }
+ }
+
+ public interface LearnersOpResponse extends Message {
+ java.util.List<String> getOldLearnersList();
+
+ int getOldLearnersCount();
+
+ String getOldLearners(int index);
+
+ java.util.List<String> getNewLearnersList();
+
+ int getNewLearnersCount();
+
+ String getNewLearners(int index);
+
+ static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createLearnersOpResponse();
+ }
+
+ public interface Builder {
+ Builder addOldLearners(String oldLearnersId);
+
+ Builder addNewLearners(String newLearnersId);
+
+ LearnersOpResponse build();
+ }
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Connection.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Connection.java
new file mode 100644
index 0000000..fd88be2
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Connection.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+/**
+ * RPC connection
+ */
+public interface Connection {
+ /**
+ * Get the attribute that bound to the connection.
+ *
+ * @param key the attribute key
+ * @return the attribute value
+ */
+ Object getAttribute(final String key);
+
+ /**
+ * Set the attribute to the connection.
+ *
+ * @param key the attribute key
+ * @param value the attribute value
+ */
+ void setAttribute(final String key, final Object value);
+
+ /**
+ * Set the attribute to the connection if the key's item doesn't exist, otherwise returns the present item.
+ *
+ * @param key the attribute key
+ * @param value the attribute value
+ * @return the previous value associated with the specified key, or
+ * <tt>null</tt> if there was no mapping for the key.
+ */
+ Object setAttributeIfAbsent(final String key, final Object value);
+
+ /**
+ * Close the connection.
+ */
+ void close();
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionClosedEventListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionClosedEventListener.java
new file mode 100644
index 0000000..176fa84
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionClosedEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+/**
+ *
+ */
+public interface ConnectionClosedEventListener {
+ void onClosed(final String remoteAddress, final Connection conn);
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
new file mode 100644
index 0000000..010ce51
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public interface InvokeCallback {
+ void complete(final Object result, final Throwable err);
+
+ default Executor executor() {
+ return null;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeContext.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeContext.java
new file mode 100644
index 0000000..0633ddd
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeContext.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * RPC invoke context.
+ */
+public class InvokeContext {
+ private final ConcurrentMap<String, Object> ctx = new ConcurrentHashMap<>();
+
+ public Object put(final String key, final Object value) {
+ return this.ctx.put(key, value);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T get(final String key) {
+ return (T) this.ctx.get(key);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T getOrDefault(final String key, final T defaultValue) {
+ return (T) this.ctx.getOrDefault(key, defaultValue);
+ }
+
+ public void clear() {
+ this.ctx.clear();
+ }
+
+ public Set<Map.Entry<String, Object>> entrySet() {
+ return this.ctx.entrySet();
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeTimeoutException.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeTimeoutException.java
new file mode 100644
index 0000000..135de0a
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeTimeoutException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+/**
+ *
+ */
+public class InvokeTimeoutException extends RemotingException {
+ private static final long serialVersionUID = -4710810309766380565L;
+
+ public InvokeTimeoutException() {
+ }
+
+ public InvokeTimeoutException(String message) {
+ super(message);
+ }
+
+ public InvokeTimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public InvokeTimeoutException(Throwable cause) {
+ super(cause);
+ }
+
+ public InvokeTimeoutException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
new file mode 100644
index 0000000..0e36975
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
@@ -0,0 +1,11 @@
+package org.apache.ignite.raft.rpc;
+
+import java.io.Serializable;
+
+/**
+ * The base message.
+ * <p>
+ * Extends Serializable for compatibility with JDK serialization.
+ */
+public interface Message extends Serializable {
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/MessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/MessageBuilderFactory.java
new file mode 100644
index 0000000..03b70da
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/MessageBuilderFactory.java
@@ -0,0 +1,50 @@
+package org.apache.ignite.raft.rpc;
+
+import org.apache.ignite.raft.rpc.message.DefaultMessageBuilderFactory;
+
+/** */
+public interface MessageBuilderFactory {
+ // TODO asch must be injected.
+ public static MessageBuilderFactory DEFAULT = new DefaultMessageBuilderFactory();
+
+ // Ping.
+ RpcRequests.PingRequest.Builder createPingRequest();
+
+ // Error.
+ RpcRequests.ErrorResponse.Builder createErrorResponse();
+
+ // CLI
+ CliRequests.AddPeerRequest.Builder createAddPeerRequest();
+
+ CliRequests.AddPeerResponse.Builder createAddPeerResponse();
+
+ CliRequests.RemovePeerRequest.Builder createRemovePeerRequest();
+
+ CliRequests.RemovePeerResponse.Builder createRemovePeerResponse();
+
+ CliRequests.ChangePeersRequest.Builder createChangePeerRequest();
+
+ CliRequests.ChangePeersResponse.Builder createChangePeerResponse();
+
+ CliRequests.SnapshotRequest.Builder createSnapshotRequest();
+
+ CliRequests.ResetPeerRequest.Builder createResetPeerRequest();
+
+ CliRequests.TransferLeaderRequest.Builder createTransferLeaderRequest();
+
+ CliRequests.GetLeaderRequest.Builder createGetLeaderRequest();
+
+ CliRequests.GetLeaderResponse.Builder createGetLeaderResponse();
+
+ CliRequests.GetPeersRequest.Builder createGetPeersRequest();
+
+ CliRequests.GetPeersResponse.Builder createGetPeersResponse();
+
+ CliRequests.AddLearnersRequest.Builder createAddLearnersRequest();
+
+ CliRequests.RemoveLearnersRequest.Builder createRemoveLearnersRequest();
+
+ CliRequests.ResetLearnersRequest.Builder createResetLearnersRequest();
+
+ CliRequests.LearnersOpResponse.Builder createLearnersOpResponse();
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NamedThreadFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NamedThreadFactory.java
new file mode 100644
index 0000000..f87e993
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NamedThreadFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Named thread factory with prefix.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ * <p>
+ * 2018-Mar-21 11:32:02 AM
+ */
+public class NamedThreadFactory implements ThreadFactory {
+ private static System.Logger LOG = System.getLogger(NamedThreadFactory.class.getName());
+
+ private static final LogUncaughtExceptionHandler UNCAUGHT_EX_HANDLER = new LogUncaughtExceptionHandler();
+
+ private final String prefix;
+
+ private final AtomicInteger counter = new AtomicInteger(0);
+ private final boolean daemon;
+
+ public NamedThreadFactory(String prefix) {
+ this(prefix, false);
+ }
+
+ public NamedThreadFactory(String prefix, boolean daemon) {
+ super();
+ this.prefix = prefix;
+ this.daemon = daemon;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setDaemon(this.daemon);
+ t.setUncaughtExceptionHandler(UNCAUGHT_EX_HANDLER);
+ t.setName(this.prefix + counter.getAndIncrement());
+ return t;
+ }
+
+ private static final class LogUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.log(System.Logger.Level.ERROR, "Uncaught exception in thread {}", t, e);
+ }
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RemotingException.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RemotingException.java
new file mode 100644
index 0000000..8bf6db2
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RemotingException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+/**
+ * Exception for default remoting problems, like the endpoint is unreachable.
+ */
+public class RemotingException extends Exception {
+ private static final long serialVersionUID = -6326244159775972292L;
+
+ public RemotingException() {
+ }
+
+ public RemotingException(String message) {
+ super(message);
+ }
+
+ public RemotingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RemotingException(Throwable cause) {
+ super(cause);
+ }
+
+ public RemotingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
new file mode 100644
index 0000000..8caee80
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.Lifecycle;
+
+/**
+ *
+ */
+public interface RpcClient extends Lifecycle<RpcOptions> {
+ /**
+ * Check connection for given address.
+ *
+ * @param endpoint target address
+ * @return true if there is a connection and the connection is active and writable.
+ */
+ boolean checkConnection(final Endpoint endpoint);
+
+ /**
+ * Check connection for given address and async to create a new one if there is no connection.
+ * @param endpoint target address
+ * @param createIfAbsent create a new one if there is no connection
+ * @return true if there is a connection and the connection is active and writable.
+ */
+ boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent);
+
+ /**
+ * Close all connections of a address.
+ *
+ * @param endpoint target address
+ */
+ void closeConnection(final Endpoint endpoint);
+
+ /**
+ * Register a connect event listener.
+ *
+ * @param listener listener.
+ */
+ void registerConnectEventListener(final ConnectionOpenedEventListener listener);
+
+ /**
+ * Synchronous invocation.
+ *
+ * @param endpoint target address
+ * @param request request object
+ * @param timeoutMs timeout millisecond
+ * @return invoke result
+ */
+ default Object invokeSync(final Endpoint endpoint, final Object request, final long timeoutMs)
+ throws InterruptedException, RemotingException {
+ return invokeSync(endpoint, request, null, timeoutMs);
+ }
+
+ /**
+ * Synchronous invocation using a invoke context.
+ *
+ * @param endpoint target address
+ * @param request request object
+ * @param ctx invoke context
+ * @param timeoutMs timeout millisecond
+ * @return invoke result
+ */
+ Object invokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
+ final long timeoutMs) throws InterruptedException, RemotingException;
+
+ /**
+ * Asynchronous invocation with a callback.
+ *
+ * @param endpoint target address
+ * @param request request object
+ * @param callback invoke callback
+ * @param timeoutMs timeout millisecond
+ */
+ default void invokeAsync(final Endpoint endpoint, final Object request, final InvokeCallback callback,
+ final long timeoutMs) throws InterruptedException, RemotingException {
+ invokeAsync(endpoint, request, null, callback, timeoutMs);
+ }
+
+ /**
+ * Asynchronous invocation with a callback.
+ *
+ * @param endpoint target address
+ * @param request request object
+ * @param ctx invoke context
+ * @param callback invoke callback
+ * @param timeoutMs timeout millisecond
+ */
+ void invokeAsync(final Endpoint endpoint, final Object request, final InvokeContext ctx, final InvokeCallback callback,
+ final long timeoutMs) throws InterruptedException, RemotingException;
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcContext.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcContext.java
new file mode 100644
index 0000000..1f7fcf8
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+/**
+ * @author jiachun.fjc
+ */
+public interface RpcContext {
+
+ /**
+ * Send a response back.
+ *
+ * @param responseObj the response object
+ */
+ void sendResponse(final Object responseObj);
+
+ /**
+ * Get current connection.
+ *
+ * @return current connection
+ */
+ Connection getConnection();
+
+ /**
+ * Get the remote address.
+ *
+ * @return remote address
+ */
+ String getRemoteAddress();
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcOptions.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcOptions.java
new file mode 100644
index 0000000..e3135c7
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcOptions.java
@@ -0,0 +1,25 @@
+package org.apache.ignite.raft.rpc;
+
+/**
+ * Basic RPC options.
+ */
+public class RpcOptions {
+ private int rpcConnectTimeoutMs = 5_000;
+ private int rpcDefaultTimeout = 5_000;
+
+ public int getRpcConnectTimeoutMs() {
+ return rpcConnectTimeoutMs;
+ }
+
+ public void setRpcConnectTimeoutMs(int rpcConnectTimeoutMs) {
+ this.rpcConnectTimeoutMs = rpcConnectTimeoutMs;
+ }
+
+ public int getRpcDefaultTimeout() {
+ return rpcDefaultTimeout;
+ }
+
+ public void setRpcDefaultTimeout(int rpcDefaultTimeout) {
+ this.rpcDefaultTimeout = rpcDefaultTimeout;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcProcessor.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcProcessor.java
new file mode 100644
index 0000000..c6dedd0
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcProcessor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Defined functions for process user defined request.
+ *
+ * @author jiachun.fjc
+ */
+public interface RpcProcessor<T> {
+
+ /**
+ * Async to handle request with {@link RpcContext}.
+ *
+ * @param rpcCtx the rpc context
+ * @param request the request
+ */
+ void handleRequest(final RpcContext rpcCtx, final T request);
+
+ /**
+ * The class name of user request.
+ * Use String type to avoid loading class.
+ *
+ * @return interested request's class name
+ */
+ String interest();
+
+ /**
+ * Get user's executor.
+ *
+ * @return executor
+ */
+ default Executor executor() {
+ return null;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcRequests.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcRequests.java
new file mode 100644
index 0000000..048065c
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcRequests.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: rpc.proto
+
+package org.apache.ignite.raft.rpc;
+
+public final class RpcRequests {
+ private RpcRequests() {
+ }
+
+ public interface ErrorResponse extends Message {
+ int getErrorCode();
+
+ String getErrorMsg();
+
+ interface Builder {
+ Builder setErrorCode(int code);
+
+ Builder setErrorMsg(String msg);
+
+ ErrorResponse build();
+ }
+
+ public static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createErrorResponse();
+ }
+ }
+
+ public interface PingRequest extends Message {
+ /**
+ * <code>required int64 send_timestamp = 1;</code>
+ */
+ long getSendTimestamp();
+
+ interface Builder {
+ Builder setSendTimestamp(long timestamp);
+
+ PingRequest build();
+ }
+
+ public static Builder newBuilder() {
+ return MessageBuilderFactory.DEFAULT.createPingRequest();
+ }
+ }
+}
+
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcServer.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcServer.java
new file mode 100644
index 0000000..ce17a56
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcServer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import org.apache.ignite.raft.Lifecycle;
+
+/**
+ *
+ * @author jiachun.fjc
+ */
+public interface RpcServer extends Lifecycle<RpcOptions> {
+
+ /**
+ * Register a conn closed event listener.
+ *
+ * @param listener the event listener.
+ */
+ void registerConnectionClosedEventListener(final ConnectionClosedEventListener listener);
+
+ /**
+ * Register user processor.
+ *
+ * @param processor the user processor which has a interest
+ */
+ void registerProcessor(final RpcProcessor<?> processor);
+
+ /**
+ *
+ * @return bound port
+ */
+ int boundPort();
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java
new file mode 100644
index 0000000..9bbaaa3
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.raft.Closure;
+import org.apache.ignite.raft.Status;
+
+/**
+ * RPC utilities.
+ */
+public final class RpcUtils {
+ private static final System.Logger LOG = System.getLogger(RpcUtils.class.getName());
+
+ /**
+ * Default jraft closure executor pool minimum size.
+ */
+ public static final int MIN_RPC_CLOSURE_EXECUTOR_POOL_SIZE = 1;
+
+ /**
+ * Default jraft closure executor pool maximum size.
+ */
+ public static final int MAX_RPC_CLOSURE_EXECUTOR_POOL_SIZE = Runtime.getRuntime().availableProcessors();
+
+ /**
+ * Global thread pool to run rpc closure.
+ */
+ public static ThreadPoolExecutor RPC_CLOSURE_EXECUTOR = new ThreadPoolExecutor(MIN_RPC_CLOSURE_EXECUTOR_POOL_SIZE,
+ MAX_RPC_CLOSURE_EXECUTOR_POOL_SIZE,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new NamedThreadFactory("JRaft-Rpc-Closure-Executor-", true));
+
+ /**
+ * Run closure with OK status in thread pool.
+ */
+ public static Future<?> runClosureInThread(final Closure done) {
+ if (done == null) {
+ return null;
+ }
+ return runClosureInThread(done, Status.OK());
+ }
+
+ /**
+ * Run a task in thread pool, returns the future object.
+ */
+ public static Future<?> runInThread(final Runnable runnable) {
+ return RPC_CLOSURE_EXECUTOR.submit(runnable);
+ }
+
+ /**
+ * Run closure with status in thread pool.
+ */
+ public static Future<?> runClosureInThread(final Closure done, final Status status) {
+ if (done == null) {
+ return null;
+ }
+
+ return runInThread(() -> {
+ try {
+ done.run(status);
+ } catch (final Throwable t) {
+ LOG.log(System.Logger.Level.WARNING, "Fail to run done closure.", t);
+ }
+ });
+ }
+
+ /**
+ * Run closure with status in specified executor
+ */
+ public static void runClosureInExecutor(final Executor executor, final Closure done, final Status status) {
+ if (done == null) {
+ return;
+ }
+
+ if (executor == null) {
+ runClosureInThread(done, status);
+ return;
+ }
+
+ executor.execute(() -> {
+ try {
+ done.run(status);
+ } catch (final Throwable t) {
+ LOG.log(System.Logger.Level.WARNING, "Fail to run done closure.", t);
+ }
+ });
+ }
+
+ private RpcUtils() {
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalConnection.java
similarity index 86%
copy from modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
copy to modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalConnection.java
index db63614..4041316 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalConnection.java
@@ -1,16 +1,13 @@
-package com.alipay.sofa.jraft.rpc.impl;
+package org.apache.ignite.raft.rpc.impl;
-import com.alipay.sofa.jraft.rpc.Connection;
-import com.alipay.sofa.jraft.rpc.Message;
-import com.alipay.sofa.jraft.rpc.RpcRequests;
-import com.alipay.sofa.jraft.util.Endpoint;
-import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;
+import org.apache.ignite.raft.rpc.Connection;
+import org.apache.ignite.raft.rpc.Message;
public class LocalConnection implements Connection {
private static boolean RECORD_ALL_MESSAGES = false;
@@ -41,7 +38,8 @@ public class LocalConnection implements Connection {
private void send(Message request, Future fut) {
Object[] tuple = {client, request, fut};
- assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue.
+
+ srv.incoming.offer(tuple);
}
public void onBeforeRequestSend(Message request, Future fut) {
@@ -62,10 +60,11 @@ public class LocalConnection implements Connection {
}
public void onAfterResponseSend(Message msg, Throwable err) {
- assert err == null : err;
+ if (msg == null) // Ignore timeouts.
+ return;
if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(msg))
- recordedMsgs.add(new Object[] {System.currentTimeMillis(), msg});
+ recordedMsgs.add(new Object[] {System.currentTimeMillis(), msg, err});
}
@Override public Object getAttribute(String key) {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcClient.java
new file mode 100644
index 0000000..4c9a98f
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcClient.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc.impl;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.rpc.ConnectionOpenedEventListener;
+import org.apache.ignite.raft.rpc.InvokeCallback;
+import org.apache.ignite.raft.rpc.InvokeContext;
+import org.apache.ignite.raft.rpc.InvokeTimeoutException;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RemotingException;
+import org.apache.ignite.raft.rpc.RpcClient;
+import org.apache.ignite.raft.rpc.RpcOptions;
+import org.apache.ignite.raft.rpc.RpcUtils;
+
+/**
+ * Local rpc client impl.
+ *
+ * @author ascherbakov.
+ */
+public class LocalRpcClient implements RpcClient {
+ private List<ConnectionOpenedEventListener> listeners = new CopyOnWriteArrayList<>();
+
+ @Override public boolean checkConnection(Endpoint endpoint) {
+ return LocalRpcServer.connect(this, endpoint, false, null);
+ }
+
+ @Override public boolean checkConnection(Endpoint endpoint, boolean createIfAbsent) {
+ return LocalRpcServer.connect(this, endpoint, createIfAbsent, this::onCreated);
+ }
+
+ @Override public void closeConnection(Endpoint endpoint) {
+ LocalRpcServer.closeConnection(this, endpoint);
+ }
+
+ @Override public void registerConnectEventListener(ConnectionOpenedEventListener listener) {
+ if (!listeners.contains(listeners))
+ listeners.add(listener);
+ }
+
+ private void onCreated(LocalConnection conn) {
+ for (ConnectionOpenedEventListener listener : listeners) {
+ listener.onOpened(conn.srv.local.toString(), conn);
+ }
+ }
+
+ @Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx, long timeoutMs) throws InterruptedException, RemotingException {
+ if (!checkConnection(endpoint, true))
+ throw new RemotingException("Server is dead " + endpoint);
+
+ LocalRpcServer srv = LocalRpcServer.servers.get(endpoint);
+ if (srv == null)
+ throw new RemotingException("Server is dead " + endpoint);
+
+ LocalConnection locConn = srv.conns.get(this);
+ if (locConn == null)
+ throw new RemotingException("Server is dead " + endpoint);
+
+ CompletableFuture<Object> fut = new CompletableFuture();
+
+ locConn.onBeforeRequestSend((Message) request, fut);
+
+ try {
+ return fut.whenComplete((res, err) -> locConn.onAfterResponseSend((Message) res, err)).get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new RemotingException(e);
+ } catch (TimeoutException e) {
+ throw new InvokeTimeoutException(e);
+ }
+ }
+
+ @Override public void invokeAsync(Endpoint endpoint, Object request, InvokeContext ctx, InvokeCallback callback, long timeoutMs) throws InterruptedException, RemotingException {
+ if (!checkConnection(endpoint, true))
+ throw new RemotingException("Server is dead " + endpoint);
+
+ LocalRpcServer srv = LocalRpcServer.servers.get(endpoint);
+ if (srv == null)
+ throw new RemotingException("Server is dead " + endpoint);
+
+ LocalConnection locConn = srv.conns.get(this);
+ if (locConn == null)
+ throw new RemotingException("Server is dead " + endpoint);
+
+ CompletableFuture<Object> fut = new CompletableFuture<>();
+
+ locConn.onBeforeRequestSend((Message) request, fut);
+
+ fut.orTimeout(timeoutMs, TimeUnit.MILLISECONDS).whenComplete((res, err) -> {
+ locConn.onAfterResponseSend((Message) res, err);
+
+ RpcUtils.runInThread(() -> callback.complete(res,
+ err instanceof TimeoutException ? new InvokeTimeoutException(err) : err)); // Avoid deadlocks if a closure has completed in the same thread.
+ });
+ }
+
+ @Override public boolean init(RpcOptions opts) {
+ return true;
+ }
+
+ @Override public void shutdown() {
+ // Close all connection from this peer.
+ for (LocalRpcServer value : LocalRpcServer.servers.values())
+ LocalRpcServer.closeConnection(this, value.local);
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcServer.java
similarity index 77%
copy from modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
copy to modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcServer.java
index db9648a..18b1909 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcServer.java
@@ -14,15 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.alipay.sofa.jraft.rpc.impl;
-
-import com.alipay.sofa.jraft.rpc.Connection;
-import com.alipay.sofa.jraft.rpc.Message;
-import com.alipay.sofa.jraft.rpc.RpcContext;
-import com.alipay.sofa.jraft.rpc.RpcProcessor;
-import com.alipay.sofa.jraft.rpc.RpcServer;
-import com.alipay.sofa.jraft.util.Endpoint;
-import com.alipay.sofa.jraft.util.NamedThreadFactory;
+package org.apache.ignite.raft.rpc.impl;
+
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -31,13 +24,17 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.rpc.Connection;
+import org.apache.ignite.raft.rpc.ConnectionClosedEventListener;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcContext;
+import org.apache.ignite.raft.rpc.RpcOptions;
+import org.apache.ignite.raft.rpc.RpcProcessor;
+import org.apache.ignite.raft.rpc.RpcServer;
+import org.apache.ignite.raft.rpc.RpcUtils;
/**
* Local RPC server impl.
@@ -45,8 +42,6 @@ import org.slf4j.LoggerFactory;
* @author ascherbakov.
*/
public class LocalRpcServer implements RpcServer {
- private static final Logger LOG = LoggerFactory.getLogger(LocalRpcServer.class);
-
/** Running servers. */
public static ConcurrentMap<Endpoint, LocalRpcServer> servers = new ConcurrentHashMap<>();
@@ -65,9 +60,6 @@ public class LocalRpcServer implements RpcServer {
BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch OOM is possible, handle that.
- // TODO FIXME asch Or better use com.alipay.sofa.jraft.rpc.RpcUtils.RPC_CLOSURE_EXECUTOR ?
- private ExecutorService defaultExecutor;
-
public LocalRpcServer(Endpoint local) {
this.local = local;
}
@@ -122,7 +114,7 @@ public class LocalRpcServer implements RpcServer {
return local.getPort();
}
- @Override public synchronized boolean init(Void opts) {
+ @Override public synchronized boolean init(RpcOptions opts) {
if (started)
return false;
@@ -154,16 +146,10 @@ public class LocalRpcServer implements RpcServer {
}
}
- RpcProcessor.ExecutorSelector selector = prc.executorSelector();
-
- Executor executor = null;
-
- if (selector != null) {
- executor = selector.select(null, msg);
- }
+ Executor executor = prc.executor();
if (executor == null)
- executor = defaultExecutor;
+ executor = RpcUtils.RPC_CLOSURE_EXECUTOR;
RpcProcessor finalPrc = prc;
@@ -189,14 +175,14 @@ public class LocalRpcServer implements RpcServer {
}
});
- defaultExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("LocalRPCServer-Default-Executor-Thread: " + local.toString()));
+ started = true;
- worker.setName("LocalRPCServer-Dispatch-Thread: " + local.toString());
+ worker.setName("LocalRPCServer-Dispatch-Thread: " + local.toString()); // TODO asch use MPSC pattern ?
worker.start();
servers.put(local, this);
- started = true;
+
return true;
}
@@ -214,17 +200,6 @@ public class LocalRpcServer implements RpcServer {
throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local);
}
- defaultExecutor.shutdownNow();
-
- try {
- boolean stopped = defaultExecutor.awaitTermination(60_000, TimeUnit.MILLISECONDS);
-
- if (!stopped) // TODO asch make thread dump.
- LOG.error("Failed to wait for graceful executor shutdown, probably some task is hanging.");
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local);
- }
-
// Close all connections to this server.
for (LocalRpcClient client : conns.keySet())
closeConnection(client, local);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/AddLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/AddLearnersRequestImpl.java
new file mode 100644
index 0000000..940c653
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/AddLearnersRequestImpl.java
@@ -0,0 +1,53 @@
+package org.apache.ignite.raft.rpc.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.rpc.CliRequests;
+
+public class AddLearnersRequestImpl implements CliRequests.AddLearnersRequest, CliRequests.AddLearnersRequest.Builder {
+ private String groupId;
+ private String leaderId;
+ private List<String> learnersList = new ArrayList<>();
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override public List<String> getLearnersList() {
+ return learnersList;
+ }
+
+ @Override public int getLearnersCount() {
+ return learnersList.size();
+ }
+
+ @Override public String getLearners(int index) {
+ return learnersList.get(index);
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setLeaderId(String leaderId) {
+ this.leaderId = leaderId;
+
+ return this;
+ }
+
+ @Override public Builder addLearners(String learnerId) {
+ learnersList.add(learnerId);
+
+ return this;
+ }
+
+ @Override public CliRequests.AddLearnersRequest build() {
+ return this;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderRequestImpl.java
new file mode 100644
index 0000000..cde6e81
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderRequestImpl.java
@@ -0,0 +1,37 @@
+package org.apache.ignite.raft.rpc.message;
+
+
+import org.apache.ignite.raft.rpc.CliRequests;
+
+public class CreateGetLeaderRequestImpl implements CliRequests.GetLeaderRequest, CliRequests.GetLeaderRequest.Builder {
+ private String groupId;
+ private String peerId;
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getPeerId() {
+ return peerId;
+ }
+
+ @Override public boolean hasPeerId() {
+ return peerId != null;
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setPeerId(String peerId) {
+ this.peerId = peerId;
+
+ return this;
+ }
+
+ @Override public CliRequests.GetLeaderRequest build() {
+ return this;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderResponseImpl.java
new file mode 100644
index 0000000..517eb47
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderResponseImpl.java
@@ -0,0 +1,22 @@
+package org.apache.ignite.raft.rpc.message;
+
+
+import org.apache.ignite.raft.rpc.CliRequests;
+
+public class CreateGetLeaderResponseImpl implements CliRequests.GetLeaderResponse, CliRequests.GetLeaderResponse.Builder {
+ private String leaderId;
+
+ @Override public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override public CliRequests.GetLeaderResponse build() {
+ return this;
+ }
+
+ @Override public Builder setLeaderId(String leaderId) {
+ this.leaderId = leaderId;
+
+ return this;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/DefaultMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/DefaultMessageBuilderFactory.java
new file mode 100644
index 0000000..9f3ef49
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/DefaultMessageBuilderFactory.java
@@ -0,0 +1,86 @@
+package org.apache.ignite.raft.rpc.message;
+
+import org.apache.ignite.raft.rpc.CliRequests;
+import org.apache.ignite.raft.rpc.MessageBuilderFactory;
+import org.apache.ignite.raft.rpc.RpcRequests;
+
+/**
+ * Default message builders factory.
+ */
+public class DefaultMessageBuilderFactory implements MessageBuilderFactory {
+ @Override public RpcRequests.PingRequest.Builder createPingRequest() {
+ return new PingRequestImpl();
+ }
+
+ @Override public RpcRequests.ErrorResponse.Builder createErrorResponse() {
+ return new ErrorResponseImpl();
+ }
+
+ @Override public CliRequests.AddPeerRequest.Builder createAddPeerRequest() {
+ return new AddPeerRequestImpl();
+ }
+
+ @Override public CliRequests.AddPeerResponse.Builder createAddPeerResponse() {
+ return new AddPeerResponseImpl();
+ }
+
+ @Override public CliRequests.RemovePeerRequest.Builder createRemovePeerRequest() {
+ return new RemovePeerRequestImpl();
+ }
+
+ @Override public CliRequests.RemovePeerResponse.Builder createRemovePeerResponse() {
+ return new RemovePeerResponseImpl();
+ }
+
+ @Override public CliRequests.ChangePeersRequest.Builder createChangePeerRequest() {
+ return new ChangePeerRequestImpl();
+ }
+
+ @Override public CliRequests.ChangePeersResponse.Builder createChangePeerResponse() {
+ return new ChangePeersResponseImpl();
+ }
+
+ @Override public CliRequests.SnapshotRequest.Builder createSnapshotRequest() {
+ return new SnapshotRequestImpl();
+ }
+
+ @Override public CliRequests.ResetPeerRequest.Builder createResetPeerRequest() {
+ return new ResetPeerRequestImpl();
+ }
+
+ @Override public CliRequests.TransferLeaderRequest.Builder createTransferLeaderRequest() {
+ return new TransferLeaderRequestImpl();
+ }
+
+ @Override public CliRequests.GetLeaderRequest.Builder createGetLeaderRequest() {
+ return new CreateGetLeaderRequestImpl();
+ }
+
+ @Override public CliRequests.GetLeaderResponse.Builder createGetLeaderResponse() {
+ return new CreateGetLeaderResponseImpl();
+ }
+
+ @Override public CliRequests.GetPeersRequest.Builder createGetPeersRequest() {
+ return new GetPeersRequestImpl();
+ }
+
+ @Override public CliRequests.GetPeersResponse.Builder createGetPeersResponse() {
+ return new GetPeersResponseImpl();
+ }
+
+ @Override public CliRequests.AddLearnersRequest.Builder createAddLearnersRequest() {
+ return new AddLearnersRequestImpl();
+ }
+
+ @Override public CliRequests.RemoveLearnersRequest.Builder createRemoveLearnersRequest() {
+ return new RemoveLearnersRequestImpl();
+ }
+
+ @Override public CliRequests.ResetLearnersRequest.Builder createResetLearnersRequest() {
+ return new ResetLearnersRequestImpl();
+ }
+
+ @Override public CliRequests.LearnersOpResponse.Builder createLearnersOpResponse() {
+ return new LearnersOpResponseImpl();
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java
new file mode 100644
index 0000000..ad1f2b7
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java
@@ -0,0 +1,25 @@
+package org.apache.ignite.raft.rpc.message;
+
+import org.apache.ignite.raft.rpc.RpcRequests;
+
+class ErrorResponseImpl implements RpcRequests.ErrorResponse, RpcRequests.ErrorResponse.Builder {
+ @Override public int getErrorCode() {
+ return 0;
+ }
+
+ @Override public String getErrorMsg() {
+ return null;
+ }
+
+ @Override public Builder setErrorCode(int code) {
+ return null;
+ }
+
+ @Override public Builder setErrorMsg(String msg) {
+ return null;
+ }
+
+ @Override public RpcRequests.ErrorResponse build() {
+ return null;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/LearnersOpResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/LearnersOpResponseImpl.java
new file mode 100644
index 0000000..06ff501
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/LearnersOpResponseImpl.java
@@ -0,0 +1,50 @@
+package org.apache.ignite.raft.rpc.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.rpc.CliRequests;
+
+class LearnersOpResponseImpl implements CliRequests.LearnersOpResponse, CliRequests.LearnersOpResponse.Builder {
+ private List<String> oldLearnersList = new ArrayList<>();
+ private List<String> newLearnersList = new ArrayList<>();
+
+ @Override public List<String> getOldLearnersList() {
+ return oldLearnersList;
+ }
+
+ @Override public int getOldLearnersCount() {
+ return oldLearnersList.size();
+ }
+
+ @Override public String getOldLearners(int index) {
+ return oldLearnersList.get(index);
+ }
+
+ @Override public List<String> getNewLearnersList() {
+ return newLearnersList;
+ }
+
+ @Override public int getNewLearnersCount() {
+ return newLearnersList.size();
+ }
+
+ @Override public String getNewLearners(int index) {
+ return newLearnersList.get(index);
+ }
+
+ @Override public Builder addOldLearners(String oldLearnersId) {
+ oldLearnersList.add(oldLearnersId);
+
+ return this;
+ }
+
+ @Override public Builder addNewLearners(String newLearnersId) {
+ newLearnersList.add(newLearnersId);
+
+ return this;
+ }
+
+ @Override public CliRequests.LearnersOpResponse build() {
+ return this;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/PingRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/PingRequestImpl.java
new file mode 100644
index 0000000..b4eba88
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/PingRequestImpl.java
@@ -0,0 +1,21 @@
+package org.apache.ignite.raft.rpc.message;
+
+import org.apache.ignite.raft.rpc.RpcRequests;
+
+class PingRequestImpl implements RpcRequests.PingRequest, RpcRequests.PingRequest.Builder {
+ private long sendTimestamp;
+
+ @Override public long getSendTimestamp() {
+ return sendTimestamp;
+ }
+
+ @Override public Builder setSendTimestamp(long timestamp) {
+ this.sendTimestamp = timestamp;
+
+ return this;
+ }
+
+ @Override public RpcRequests.PingRequest build() {
+ return this;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/RemoveLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/RemoveLearnersRequestImpl.java
new file mode 100644
index 0000000..ef4005c
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/RemoveLearnersRequestImpl.java
@@ -0,0 +1,53 @@
+package org.apache.ignite.raft.rpc.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.rpc.CliRequests;
+
+class RemoveLearnersRequestImpl implements CliRequests.RemoveLearnersRequest, CliRequests.RemoveLearnersRequest.Builder {
+ private String groupId;
+ private String leaderId;
+ private List<String> learnersList = new ArrayList<>();
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override public List<String> getLearnersList() {
+ return learnersList;
+ }
+
+ @Override public int getLearnersCount() {
+ return learnersList.size();
+ }
+
+ @Override public String getLearners(int index) {
+ return learnersList.get(index);
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setLeaderId(String leaderId) {
+ this.leaderId = leaderId;
+
+ return this;
+ }
+
+ @Override public Builder addLearners(String learnerId) {
+ learnersList.add(learnerId);
+
+ return this;
+ }
+
+ @Override public CliRequests.RemoveLearnersRequest build() {
+ return this;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ResetLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ResetLearnersRequestImpl.java
new file mode 100644
index 0000000..80ea2cb
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ResetLearnersRequestImpl.java
@@ -0,0 +1,53 @@
+package org.apache.ignite.raft.rpc.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.rpc.CliRequests;
+
+class ResetLearnersRequestImpl implements CliRequests.ResetLearnersRequest, CliRequests.ResetLearnersRequest.Builder {
+ private String groupId;
+ private String leaderId;
+ private List<String> learnersList = new ArrayList<>();
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override public List<String> getLearnersList() {
+ return learnersList;
+ }
+
+ @Override public int getLearnersCount() {
+ return learnersList.size();
+ }
+
+ @Override public String getLearners(int index) {
+ return learnersList.get(index);
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setLeaderId(String leaderId) {
+ this.leaderId = leaderId;
+
+ return this;
+ }
+
+ @Override public Builder addLearners(String learnerId) {
+ learnersList.add(learnerId);
+
+ return this;
+ }
+
+ @Override public CliRequests.ResetLearnersRequest build() {
+ return this;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/SnapshotRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/SnapshotRequestImpl.java
new file mode 100644
index 0000000..00dfb47
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/SnapshotRequestImpl.java
@@ -0,0 +1,32 @@
+package org.apache.ignite.raft.rpc.message;
+
+import org.apache.ignite.raft.rpc.CliRequests;
+
+class SnapshotRequestImpl implements CliRequests.SnapshotRequest, CliRequests.SnapshotRequest.Builder {
+ private String groupId;
+ private String peerId;
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getPeerId() {
+ return peerId;
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setPeerId(String peerId) {
+ this.peerId = peerId;
+
+ return this;
+ }
+
+ @Override public CliRequests.SnapshotRequest build() {
+ return this;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/TransferLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/TransferLeaderRequestImpl.java
new file mode 100644
index 0000000..5796ee0
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/TransferLeaderRequestImpl.java
@@ -0,0 +1,47 @@
+package org.apache.ignite.raft.rpc.message;
+
+import org.apache.ignite.raft.rpc.CliRequests;
+
+class TransferLeaderRequestImpl implements CliRequests.TransferLeaderRequest, CliRequests.TransferLeaderRequest.Builder {
+ private String groupId;
+ private String leaderId;
+ private String peerId;
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override public String getPeerId() {
+ return peerId;
+ }
+
+ @Override public boolean hasPeerId() {
+ return peerId != null;
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setLeaderId(String leaderId) {
+ this.leaderId = leaderId;
+
+ return this;
+ }
+
+ @Override public Builder setPeerId(String peerId) {
+ this.peerId = peerId;
+
+ return this;
+ }
+
+ @Override public CliRequests.TransferLeaderRequest build() {
+ return this;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java
new file mode 100644
index 0000000..da09246
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.service;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.RaftError;
+import org.apache.ignite.raft.Status;
+import org.apache.ignite.raft.closure.RpcResponseClosure;
+import org.apache.ignite.raft.rpc.InvokeCallback;
+import org.apache.ignite.raft.rpc.InvokeContext;
+import org.apache.ignite.raft.rpc.InvokeTimeoutException;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RemotingException;
+import org.apache.ignite.raft.rpc.RpcClient;
+import org.apache.ignite.raft.rpc.RpcOptions;
+import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
+import org.apache.ignite.raft.rpc.RpcUtils;
+import org.apache.ignite.raft.rpc.impl.LocalRpcClient;
+
+/**
+ * Abstract RPC client service based.
+ */
+public abstract class AbstractClientService implements ClientService {
+ protected static final System.Logger LOG = System.getLogger(AbstractClientService.class.getName());
+
+ protected volatile RpcClient rpcClient;
+ protected RpcOptions rpcOptions;
+
+ public RpcClient getRpcClient() {
+ return this.rpcClient;
+ }
+
+ @Override
+ public boolean isConnected(final Endpoint endpoint) {
+ final RpcClient rc = this.rpcClient;
+ return rc != null && isConnected(rc, endpoint);
+ }
+
+ private static boolean isConnected(final RpcClient rpcClient, final Endpoint endpoint) {
+ return rpcClient.checkConnection(endpoint);
+ }
+
+ @Override
+ public boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent) {
+ final RpcClient rc = this.rpcClient;
+ if (rc == null) {
+ throw new IllegalStateException("Client service is uninitialized.");
+ }
+ return rc.checkConnection(endpoint, createIfAbsent);
+ }
+
+ @Override
+ public synchronized boolean init(final RpcOptions rpcOptions) {
+ if (this.rpcClient != null) {
+ return true;
+ }
+ this.rpcOptions = rpcOptions;
+ return initRpcClient();
+ }
+
+ protected boolean initRpcClient() {
+ this.rpcClient = new LocalRpcClient();
+ this.rpcClient.init(rpcOptions);
+
+ return true;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (this.rpcClient != null) {
+ this.rpcClient.shutdown();
+ this.rpcClient = null;
+ }
+ }
+
+ @Override
+ public boolean connect(final Endpoint endpoint) {
+ final RpcClient rc = this.rpcClient;
+ if (rc == null) {
+ throw new IllegalStateException("Client service is uninitialized.");
+ }
+ if (isConnected(rc, endpoint)) {
+ return true;
+ }
+ try {
+ final PingRequest req = PingRequest.newBuilder() //
+ .setSendTimestamp(System.currentTimeMillis()) //
+ .build();
+ final ErrorResponse resp = (ErrorResponse) rc.invokeSync(endpoint, req,
+ this.rpcOptions.getRpcConnectTimeoutMs());
+ return resp.getErrorCode() == 0;
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (final RemotingException e) {
+ LOG.log(System.Logger.Level.WARNING, "Fail to connect {0}, remoting exception: {1}.", endpoint, e.getMessage());
+ return false;
+ }
+ }
+
+ @Override
+ public boolean disconnect(final Endpoint endpoint) {
+ final RpcClient rc = this.rpcClient;
+ if (rc == null) {
+ return true;
+ }
+ LOG.log(System.Logger.Level.INFO, "Disconnect from {}.", endpoint);
+ rc.closeConnection(endpoint);
+ return true;
+ }
+
+ @Override
+ public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+ final RpcResponseClosure<T> done, final int timeoutMs) {
+ return invokeWithDone(endpoint, request, null, done, timeoutMs, null);
+ }
+
+ public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+ final InvokeContext ctx,
+ final RpcResponseClosure<T> done, final int timeoutMs,
+ final Executor rpcExecutor) {
+ final RpcClient rc = this.rpcClient;
+ final CompletableFuture<Message> future = new CompletableFuture<>();
+ final Executor currExecutor = rpcExecutor != null ? rpcExecutor : RpcUtils.RPC_CLOSURE_EXECUTOR;
+ try {
+ if (rc == null) {
+ future.completeExceptionally(new IllegalStateException("Client service is uninitialized."));
+ // should be in another thread to avoid dead locking.
+ RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL,
+ "Client service is uninitialized."));
+ return future;
+ }
+
+ rc.invokeAsync(endpoint, request, ctx, new InvokeCallback() {
+ @SuppressWarnings({"unchecked", "ConstantConditions"})
+ @Override
+ public void complete(final Object result, final Throwable err) {
+ if (future.isCancelled()) {
+ onCanceled(request, done);
+ return;
+ }
+
+ if (err == null) {
+ Status status = Status.OK();
+ Message msg;
+ if (result instanceof ErrorResponse) {
+ status = handleErrorResponse((ErrorResponse) result);
+ msg = (Message) result;
+ } else {
+ msg = (Message) result;
+ }
+ if (done != null) {
+ try {
+ if (status.isOk()) {
+ done.setResponse((T) msg);
+ }
+ done.run(status);
+ } catch (final Throwable t) {
+ LOG.log(System.Logger.Level.WARNING, "Fail to run RpcResponseClosure, the request is {}.", request, t);
+ }
+ }
+ if (!future.isDone()) {
+ future.complete(msg);
+ }
+ } else {
+ if (done != null) {
+ try {
+ done.run(new Status(err instanceof InvokeTimeoutException ? RaftError.ETIMEDOUT
+ : RaftError.EINTERNAL, "RPC exception:" + err.getMessage()));
+ } catch (final Throwable t) {
+ LOG.log(System.Logger.Level.WARNING, "Fail to run RpcResponseClosure, the request is {}.", request, t);
+ }
+ }
+ if (!future.isDone()) {
+ future.completeExceptionally(err);
+ }
+ }
+ }
+
+ @Override
+ public Executor executor() {
+ return currExecutor;
+ }
+ }, timeoutMs <= 0 ? this.rpcOptions.getRpcDefaultTimeout() : timeoutMs);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ future.completeExceptionally(e);
+ // should be in another thread to avoid dead locking.
+ RpcUtils.runClosureInExecutor(currExecutor, done,
+ new Status(RaftError.EINTR, "Sending rpc was interrupted"));
+ } catch (final RemotingException e) {
+ future.completeExceptionally(e);
+ // should be in another thread to avoid dead locking.
+ RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL,
+ "Fail to send a RPC request:" + e.getMessage()));
+
+ }
+
+ return future;
+ }
+
+ private static Status handleErrorResponse(final ErrorResponse eResp) {
+ final Status status = new Status();
+ status.setCode(eResp.getErrorCode());
+ status.setErrorMsg(eResp.getErrorMsg());
+ return status;
+ }
+
+ private <T extends Message> void onCanceled(final Message request, final RpcResponseClosure<T> done) {
+ if (done != null) {
+ try {
+ done.run(new Status(RaftError.ECANCELED, "RPC request was canceled by future."));
+ } catch (final Throwable t) {
+ LOG.log(System.Logger.Level.WARNING, "Fail to run RpcResponseClosure, the request is {}.", request, t);
+ }
+ }
+ }
+
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
new file mode 100644
index 0000000..8082191
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.service;
+
+import java.util.concurrent.Future;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.closure.RpcResponseClosure;
+import org.apache.ignite.raft.rpc.CliRequests;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcRequests;
+
+/**
+ * Cli RPC client service.
+ */
+public interface CliClientService extends ClientService {
+ /**
+ * Adds a peer.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ */
+ Future<Message> addPeer(Endpoint endpoint, CliRequests.AddPeerRequest request,
+ RpcResponseClosure<CliRequests.AddPeerResponse> done);
+
+ /**
+ * Removes a peer.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ */
+ Future<Message> removePeer(Endpoint endpoint, CliRequests.RemovePeerRequest request,
+ RpcResponseClosure<CliRequests.RemovePeerResponse> done);
+
+ /**
+ * Reset a peer.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ */
+ Future<Message> resetPeer(Endpoint endpoint, CliRequests.ResetPeerRequest request,
+ RpcResponseClosure<RpcRequests.ErrorResponse> done);
+
+ /**
+ * Do a snapshot.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ */
+ Future<Message> snapshot(Endpoint endpoint, CliRequests.SnapshotRequest request,
+ RpcResponseClosure<RpcRequests.ErrorResponse> done);
+
+ /**
+ * Change peers.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ */
+ Future<Message> changePeers(Endpoint endpoint, CliRequests.ChangePeersRequest request,
+ RpcResponseClosure<CliRequests.ChangePeersResponse> done);
+
+ /**
+ * Add learners
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ * @since 1.3.0
+ */
+ Future<Message> addLearners(Endpoint endpoint, CliRequests.AddLearnersRequest request,
+ RpcResponseClosure<CliRequests.LearnersOpResponse> done);
+
+ /**
+ * Remove learners
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ * @since 1.3.0
+ */
+ Future<Message> removeLearners(Endpoint endpoint, CliRequests.RemoveLearnersRequest request,
+ RpcResponseClosure<CliRequests.LearnersOpResponse> done);
+
+ /**
+ * Reset learners
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ * @since 1.3.0
+ */
+ Future<Message> resetLearners(Endpoint endpoint, CliRequests.ResetLearnersRequest request,
+ RpcResponseClosure<CliRequests.LearnersOpResponse> done);
+
+ /**
+ * Get the group leader.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ */
+ Future<Message> getLeader(Endpoint endpoint, CliRequests.GetLeaderRequest request,
+ RpcResponseClosure<CliRequests.GetLeaderResponse> done);
+
+ /**
+ * Transfer leadership to other peer.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ */
+ Future<Message> transferLeader(Endpoint endpoint, CliRequests.TransferLeaderRequest request,
+ RpcResponseClosure<RpcRequests.ErrorResponse> done);
+
+ /**
+ * Get all peers of the replication group.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ */
+ Future<Message> getPeers(Endpoint endpoint, CliRequests.GetPeersRequest request,
+ RpcResponseClosure<CliRequests.GetPeersResponse> done);
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
new file mode 100644
index 0000000..94ec3a2
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.service;
+
+import java.util.concurrent.Future;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.closure.RpcResponseClosure;
+import org.apache.ignite.raft.rpc.CliRequests;
+import org.apache.ignite.raft.rpc.CliRequests.AddLearnersRequest;
+import org.apache.ignite.raft.rpc.CliRequests.AddPeerRequest;
+import org.apache.ignite.raft.rpc.CliRequests.AddPeerResponse;
+import org.apache.ignite.raft.rpc.CliRequests.ChangePeersRequest;
+import org.apache.ignite.raft.rpc.CliRequests.ChangePeersResponse;
+import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.rpc.CliRequests.GetLeaderResponse;
+import org.apache.ignite.raft.rpc.CliRequests.GetPeersRequest;
+import org.apache.ignite.raft.rpc.CliRequests.GetPeersResponse;
+import org.apache.ignite.raft.rpc.CliRequests.LearnersOpResponse;
+import org.apache.ignite.raft.rpc.CliRequests.RemoveLearnersRequest;
+import org.apache.ignite.raft.rpc.CliRequests.RemovePeerRequest;
+import org.apache.ignite.raft.rpc.CliRequests.RemovePeerResponse;
+import org.apache.ignite.raft.rpc.CliRequests.ResetLearnersRequest;
+import org.apache.ignite.raft.rpc.CliRequests.ResetPeerRequest;
+import org.apache.ignite.raft.rpc.CliRequests.TransferLeaderRequest;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
+
+/**
+ *
+ */
+public class CliClientServiceImpl extends AbstractClientService implements CliClientService {
+ @Override
+ public Future<Message> addPeer(final Endpoint endpoint, final AddPeerRequest request,
+ final RpcResponseClosure<AddPeerResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
+ @Override
+ public Future<Message> removePeer(final Endpoint endpoint, final RemovePeerRequest request,
+ final RpcResponseClosure<RemovePeerResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
+ @Override
+ public Future<Message> resetPeer(final Endpoint endpoint, final ResetPeerRequest request,
+ final RpcResponseClosure<ErrorResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
+ @Override
+ public Future<Message> snapshot(final Endpoint endpoint, final CliRequests.SnapshotRequest request,
+ final RpcResponseClosure<ErrorResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
+ @Override
+ public Future<Message> changePeers(final Endpoint endpoint, final ChangePeersRequest request,
+ final RpcResponseClosure<ChangePeersResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
+ @Override
+ public Future<Message> addLearners(final Endpoint endpoint, final AddLearnersRequest request,
+ final RpcResponseClosure<LearnersOpResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
+ @Override
+ public Future<Message> removeLearners(final Endpoint endpoint, final RemoveLearnersRequest request,
+ final RpcResponseClosure<LearnersOpResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
+ @Override
+ public Future<Message> resetLearners(final Endpoint endpoint, final ResetLearnersRequest request,
+ final RpcResponseClosure<LearnersOpResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
+ @Override
+ public Future<Message> getLeader(final Endpoint endpoint, final GetLeaderRequest request,
+ final RpcResponseClosure<GetLeaderResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
+ @Override
+ public Future<Message> transferLeader(final Endpoint endpoint, final TransferLeaderRequest request,
+ final RpcResponseClosure<ErrorResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
+ @Override
+ public Future<Message> getPeers(final Endpoint endpoint, final GetPeersRequest request,
+ final RpcResponseClosure<GetPeersResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java
new file mode 100644
index 0000000..ef4976d
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.service;
+
+import java.util.concurrent.Future;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.Lifecycle;
+import org.apache.ignite.raft.closure.RpcResponseClosure;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcOptions;
+
+/**
+ * RPC client service.
+ * TODO not needed
+ */
+public interface ClientService extends Lifecycle<RpcOptions> {
+
+ /**
+ * Connect to endpoint, returns true when success.
+ *
+ * @param endpoint server address
+ * @return true on connect success
+ */
+ boolean connect(final Endpoint endpoint);
+
+ /**
+ * Check connection for given address and async to create a new one if there is no connection.
+ * @param endpoint target address
+ * @param createIfAbsent create a new one if there is no connection
+ * @return true if there is a connection and the connection is active and writable.
+ */
+ boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent);
+
+ /**
+ * Disconnect from endpoint.
+ *
+ * @param endpoint server address
+ * @return true on disconnect success
+ */
+ boolean disconnect(final Endpoint endpoint);
+
+ /**
+ * Returns true when the endpoint's connection is active.
+ *
+ * @param endpoint server address
+ * @return true on connection is active
+ */
+ boolean isConnected(final Endpoint endpoint);
+
+ /**
+ * Send a requests and waits for response with callback, returns the request future.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @param timeoutMs timeout millis
+ * @return a future with operation result
+ */
+ <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+ final RpcResponseClosure<T> done, final int timeoutMs);
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java
new file mode 100644
index 0000000..916a768
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.service;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.StampedLock;
+import org.apache.ignite.raft.Configuration;
+import org.apache.ignite.raft.PeerId;
+import org.apache.ignite.raft.RaftError;
+import org.apache.ignite.raft.Status;
+import org.apache.ignite.raft.rpc.CliRequests;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcRequests;
+
+/**
+ * Maintain routes to raft groups.
+ */
+public class RouteTable {
+ private static final System.Logger LOG = System.getLogger(RouteTable.class.getName());
+
+ private static final RouteTable INSTANCE = new RouteTable();
+
+ // Map<groupId, groupConf>
+ private final ConcurrentMap<String, GroupConf> groupConfTable = new ConcurrentHashMap<>();
+
+ public static RouteTable getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Update configuration of group in route table.
+ *
+ * @param groupId raft group id
+ * @param conf configuration to update
+ * @return true on success
+ */
+ public boolean updateConfiguration(final String groupId, final Configuration conf) {
+ final GroupConf gc = getOrCreateGroupConf(groupId);
+ final StampedLock stampedLock = gc.stampedLock;
+ final long stamp = stampedLock.writeLock();
+ try {
+ gc.conf = conf;
+ if (gc.leader != null && !gc.conf.contains(gc.leader)) {
+ gc.leader = null;
+ }
+ } finally {
+ stampedLock.unlockWrite(stamp);
+ }
+ return true;
+ }
+
+ private GroupConf getOrCreateGroupConf(final String groupId) {
+ GroupConf gc = this.groupConfTable.get(groupId);
+ if (gc == null) {
+ gc = new GroupConf();
+ final GroupConf old = this.groupConfTable.putIfAbsent(groupId, gc);
+ if (old != null) {
+ gc = old;
+ }
+ }
+ return gc;
+ }
+
+ /**
+ * Get the cached leader of the group, return it when found, null otherwise.
+ * Make sure calls {@link #refreshLeader(CliClientService, String, int)} already
+ * before invoke this method.
+ *
+ * @param groupId raft group id
+ * @return peer of leader
+ */
+ public PeerId selectLeader(final String groupId) {
+ final GroupConf gc = this.groupConfTable.get(groupId);
+ if (gc == null) {
+ return null;
+ }
+ final StampedLock stampedLock = gc.stampedLock;
+ long stamp = stampedLock.tryOptimisticRead();
+ PeerId leader = gc.leader;
+ if (!stampedLock.validate(stamp)) {
+ stamp = stampedLock.readLock();
+ try {
+ leader = gc.leader;
+ } finally {
+ stampedLock.unlockRead(stamp);
+ }
+ }
+ return leader;
+ }
+
+ /**
+ * Update leader info.
+ *
+ * @param groupId raft group id
+ * @param leader peer of leader
+ * @return true on success
+ */
+ public boolean updateLeader(final String groupId, final PeerId leader) {
+ final GroupConf gc = getOrCreateGroupConf(groupId);
+ final StampedLock stampedLock = gc.stampedLock;
+ final long stamp = stampedLock.writeLock();
+ try {
+ gc.leader = leader;
+ } finally {
+ stampedLock.unlockWrite(stamp);
+ }
+ return true;
+ }
+
+ /**
+ * Update leader info.
+ *
+ * @param groupId raft group id
+ * @param leaderStr peer string of leader
+ * @return true on success
+ */
+ public boolean updateLeader(final String groupId, final String leaderStr) {
+ final PeerId leader = new PeerId();
+ if (leader.parse(leaderStr)) {
+ return updateLeader(groupId, leader);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Get the configuration by groupId, returns null when not found.
+ *
+ * @param groupId raft group id
+ * @return configuration of the group id
+ */
+ public Configuration getConfiguration(final String groupId) {
+ final GroupConf gc = this.groupConfTable.get(groupId);
+ if (gc == null) {
+ return null;
+ }
+ final StampedLock stampedLock = gc.stampedLock;
+ long stamp = stampedLock.tryOptimisticRead();
+ Configuration conf = gc.conf;
+ if (!stampedLock.validate(stamp)) {
+ stamp = stampedLock.readLock();
+ try {
+ conf = gc.conf;
+ } finally {
+ stampedLock.unlockRead(stamp);
+ }
+ }
+ return conf;
+ }
+
+ /**
+ * Blocking the thread until query_leader finishes.
+ *
+ * @param groupId raft group id
+ * @param timeoutMs timeout millis
+ * @return operation status
+ */
+ public Status refreshLeader(final CliClientService cliClientService, final String groupId, final int timeoutMs)
+ throws InterruptedException,
+ TimeoutException {
+ final Configuration conf = getConfiguration(groupId);
+ if (conf == null) {
+ return new Status(RaftError.ENOENT,
+ "Group %s is not registered in RouteTable, forgot to call updateConfiguration?", groupId);
+ }
+ final Status st = Status.OK();
+ final CliRequests.GetLeaderRequest.Builder rb = CliRequests.GetLeaderRequest.newBuilder();
+ rb.setGroupId(groupId);
+ final CliRequests.GetLeaderRequest request = rb.build();
+ TimeoutException timeoutException = null;
+ for (final PeerId peer : conf) {
+ if (!cliClientService.connect(peer.getEndpoint())) {
+ if (st.isOk()) {
+ st.setError(-1, "Fail to init channel to %s", peer);
+ } else {
+ final String savedMsg = st.getErrorMsg();
+ st.setError(-1, "%s, Fail to init channel to %s", savedMsg, peer);
+ }
+ continue;
+ }
+ final Future<Message> result = cliClientService.getLeader(peer.getEndpoint(), request, null);
+ try {
+ final Message msg = result.get(timeoutMs, TimeUnit.MILLISECONDS);
+ if (msg instanceof RpcRequests.ErrorResponse) {
+ if (st.isOk()) {
+ st.setError(-1, ((RpcRequests.ErrorResponse) msg).getErrorMsg());
+ } else {
+ final String savedMsg = st.getErrorMsg();
+ st.setError(-1, "%s, %s", savedMsg, ((RpcRequests.ErrorResponse) msg).getErrorMsg());
+ }
+ } else {
+ final CliRequests.GetLeaderResponse response = (CliRequests.GetLeaderResponse) msg;
+ updateLeader(groupId, response.getLeaderId());
+ return Status.OK();
+ }
+ } catch (final TimeoutException e) {
+ timeoutException = e;
+ } catch (final ExecutionException e) {
+ if (st.isOk()) {
+ st.setError(-1, e.getMessage());
+ } else {
+ final String savedMsg = st.getErrorMsg();
+ st.setError(-1, "%s, %s", savedMsg, e.getMessage());
+ }
+ }
+ }
+ if (timeoutException != null) {
+ throw timeoutException;
+ }
+
+ return st;
+ }
+
+ public Status refreshConfiguration(final CliClientService cliClientService, final String groupId,
+ final int timeoutMs) throws InterruptedException, TimeoutException {
+ final Configuration conf = getConfiguration(groupId);
+ if (conf == null) {
+ return new Status(RaftError.ENOENT,
+ "Group %s is not registered in RouteTable, forgot to call updateConfiguration?", groupId);
+ }
+ final Status st = Status.OK();
+ PeerId leaderId = selectLeader(groupId);
+ if (leaderId == null) {
+ refreshLeader(cliClientService, groupId, timeoutMs);
+ leaderId = selectLeader(groupId);
+ }
+ if (leaderId == null) {
+ st.setError(-1, "Fail to get leader of group %s", groupId);
+ return st;
+ }
+ if (!cliClientService.connect(leaderId.getEndpoint())) {
+ st.setError(-1, "Fail to init channel to %s", leaderId);
+ return st;
+ }
+ final CliRequests.GetPeersRequest.Builder rb = CliRequests.GetPeersRequest.newBuilder();
+ rb.setGroupId(groupId);
+ rb.setLeaderId(leaderId.toString());
+ try {
+ final Message result = cliClientService.getPeers(leaderId.getEndpoint(), rb.build(), null).get(timeoutMs,
+ TimeUnit.MILLISECONDS);
+ if (result instanceof CliRequests.GetPeersResponse) {
+ final CliRequests.GetPeersResponse resp = (CliRequests.GetPeersResponse) result;
+ final Configuration newConf = new Configuration();
+ for (final String peerIdStr : resp.getPeersList()) {
+ final PeerId newPeer = new PeerId();
+ newPeer.parse(peerIdStr);
+ newConf.addPeer(newPeer);
+ }
+ updateConfiguration(groupId, newConf);
+ } else {
+ final RpcRequests.ErrorResponse resp = (RpcRequests.ErrorResponse) result;
+ st.setError(resp.getErrorCode(), resp.getErrorMsg());
+ }
+ } catch (final Exception e) {
+ st.setError(-1, e.getMessage());
+ }
+ return st;
+ }
+
+ /**
+ * Reset the states.
+ */
+ public void reset() {
+ this.groupConfTable.clear();
+ }
+
+ /**
+ * Remove the group from route table.
+ *
+ * @param groupId raft group id
+ * @return true on success
+ */
+ public boolean removeGroup(final String groupId) {
+ return this.groupConfTable.remove(groupId) != null;
+ }
+
+ @Override
+ public String toString() {
+ return "RouteTable{" + "groupConfTable=" + groupConfTable + '}';
+ }
+
+ private RouteTable() {
+ }
+
+ private static class GroupConf {
+
+ private final StampedLock stampedLock = new StampedLock();
+
+ private Configuration conf;
+ private PeerId leader;
+
+ @Override
+ public String toString() {
+ return "GroupConf{" + "conf=" + conf + ", leader=" + leader + '}';
+ }
+ }
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
new file mode 100644
index 0000000..0b1e8e4
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
@@ -0,0 +1,39 @@
+package org.apache.ignite.raft;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.raft.rpc.RpcOptions;
+import org.apache.ignite.raft.service.CliClientServiceImpl;
+import org.apache.ignite.raft.service.RouteTable;
+import org.junit.Test;
+
+public class RaftClientTest {
+ private static final System.Logger LOG = System.getLogger(RaftClientTest.class.getName());
+
+ @Test
+ public void testClient() throws TimeoutException, InterruptedException {
+ List<PeerId> peers = Arrays.asList(
+ new PeerId("127.0.0.1", 8080),
+ new PeerId("127.0.0.1", 8081),
+ new PeerId("127.0.0.1", 8082)
+ );
+
+ Configuration initConf = new Configuration(peers);
+
+ String groupId = "unittest";
+
+ RouteTable.getInstance().updateConfiguration(groupId, initConf);
+
+ final CliClientServiceImpl cliClientService = new CliClientServiceImpl();
+ cliClientService.init(new RpcOptions());
+
+ if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
+ throw new IllegalStateException("Refresh leader failed");
+ }
+
+ final PeerId leader = RouteTable.getInstance().selectLeader(groupId);
+
+ LOG.log(System.Logger.Level.INFO, "Leader is " + leader);
+ }
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/LocalRpcTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/LocalRpcTest.java
new file mode 100644
index 0000000..93b12a5
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/LocalRpcTest.java
@@ -0,0 +1,321 @@
+package org.apache.ignite.raft.rpc;
+
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.rpc.impl.LocalConnection;
+import org.apache.ignite.raft.rpc.impl.LocalRpcClient;
+import org.apache.ignite.raft.rpc.impl.LocalRpcServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * TODO add test for localconn.close.
+ */
+public class LocalRpcTest {
+ private Endpoint endpoint;
+ private LocalRpcServer server;
+
+ @Before
+ public void setup() {
+ endpoint = new Endpoint();
+ server = new LocalRpcServer(endpoint);
+ server.registerProcessor(new Request1RpcProcessor());
+ server.registerProcessor(new Request2RpcProcessor());
+ server.init(null);
+ }
+
+ @After
+ public void teardown() {
+ server.shutdown();
+
+ assertNull(LocalRpcServer.servers.get(endpoint));
+ }
+
+ @Test
+ public void testStartStopServer() {
+ assertNotNull(LocalRpcServer.servers.get(endpoint));
+ }
+
+ @Test
+ public void testConnection() {
+ LocalRpcClient client = new LocalRpcClient();
+
+ assertFalse(client.checkConnection(endpoint));
+
+ assertTrue(client.checkConnection(endpoint, true));
+ }
+
+ @Test
+ public void testSyncProcessing() throws RemotingException, InterruptedException {
+ RpcClient client = new LocalRpcClient();
+ Response1 resp1 = (Response1) client.invokeSync(endpoint, new Request1(), new InvokeContext(), 5000);
+ assertNotNull(resp1);
+
+ Response2 resp2 = (Response2) client.invokeSync(endpoint, new Request2(), new InvokeContext(), 5000);
+ assertNotNull(resp2);
+ }
+
+ @Test
+ public void testAsyncProcessing() throws RemotingException, InterruptedException {
+ RpcClient client = new LocalRpcClient();
+
+ CountDownLatch l1 = new CountDownLatch(1);
+ AtomicReference<Response1> resp1 = new AtomicReference<>();
+ client.invokeAsync(endpoint, new Request1(), new InvokeContext(), (result, err) -> {
+ resp1.set((Response1) result);
+ l1.countDown();
+ }, 5000);
+ l1.await(5_000, TimeUnit.MILLISECONDS);
+ assertNotNull(resp1);
+
+ CountDownLatch l2 = new CountDownLatch(1);
+ AtomicReference<Response2> resp2 = new AtomicReference<>();
+ client.invokeAsync(endpoint, new Request2(), new InvokeContext(), (result, err) -> {
+ resp2.set((Response2) result);
+ l2.countDown();
+ }, 5000);
+ l2.await(5_000, TimeUnit.MILLISECONDS);
+ assertNotNull(resp2);
+ }
+
+ @Test
+ public void testDisconnect1() {
+ RpcClient client1 = new LocalRpcClient();
+ RpcClient client2 = new LocalRpcClient();
+
+ assertTrue(client1.checkConnection(endpoint, true));
+ assertTrue(client2.checkConnection(endpoint, true));
+
+ client1.shutdown();
+
+ assertFalse(client1.checkConnection(endpoint));
+ assertTrue(client2.checkConnection(endpoint));
+
+ client2.shutdown();
+
+ assertFalse(client1.checkConnection(endpoint));
+ assertFalse(client2.checkConnection(endpoint));
+ }
+
+ @Test
+ public void testDisconnect2() {
+ RpcClient client1 = new LocalRpcClient();
+ RpcClient client2 = new LocalRpcClient();
+
+ assertTrue(client1.checkConnection(endpoint, true));
+ assertTrue(client2.checkConnection(endpoint, true));
+
+ server.shutdown();
+
+ assertFalse(client1.checkConnection(endpoint));
+ assertFalse(client2.checkConnection(endpoint));
+ }
+
+ @Test
+ public void testRecordedSync() throws RemotingException, InterruptedException {
+ AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+ RpcClient client1 = new LocalRpcClient();
+ client1.registerConnectEventListener(new ConnectionOpenedEventListener() {
+ @Override public void onOpened(String remoteAddress, Connection conn) {
+ LocalConnection locConn = (LocalConnection) conn;
+ connRef.set(locConn);
+ locConn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+ }
+ });
+
+ assertTrue(client1.checkConnection(endpoint, true));
+
+ client1.invokeSync(endpoint, new Request1(), 500);
+ client1.invokeSync(endpoint, new Request2(), 500);
+
+ Queue<Object[]> recorded = connRef.get().recordedMessages();
+
+ assertEquals(2, recorded.size());
+ assertTrue(recorded.poll()[1] instanceof Request1);
+ assertTrue(recorded.poll()[1] instanceof Response1);
+ }
+
+ @Test
+ public void testRecordedSyncTimeout() throws RemotingException, InterruptedException {
+ AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+ RpcClient client1 = new LocalRpcClient();
+
+ client1.registerConnectEventListener(new ConnectionOpenedEventListener() {
+ @Override public void onOpened(String remoteAddress, Connection conn) {
+ LocalConnection locConn = (LocalConnection) conn;
+ connRef.set(locConn);
+ locConn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+ }
+ });
+
+ assertTrue(client1.checkConnection(endpoint, true));
+
+ try {
+ Request1 request = new Request1();
+ request.val = 10_000;
+ client1.invokeSync(endpoint, request, 500);
+
+ fail();
+ } catch (InvokeTimeoutException e) {
+ // Expected.
+ }
+
+ Queue<Object[]> recorded = connRef.get().recordedMessages();
+
+ assertEquals(1, recorded.size());
+ assertTrue(recorded.poll()[1] instanceof Request1);
+ }
+
+ @Test
+ public void testRecordedAsync() throws RemotingException, InterruptedException {
+ AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+ RpcClient client1 = new LocalRpcClient();
+
+ client1.registerConnectEventListener(new ConnectionOpenedEventListener() {
+ @Override public void onOpened(String remoteAddress, Connection conn) {
+ LocalConnection locConn = (LocalConnection) conn;
+ connRef.set(locConn);
+ locConn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+ }
+ });
+
+ assertTrue(client1.checkConnection(endpoint, true));
+
+ CountDownLatch l = new CountDownLatch(2);
+
+ client1.invokeAsync(endpoint, new Request1(), null, (result, err) -> l.countDown(), 500);
+ client1.invokeAsync(endpoint, new Request2(), null, (result, err) -> l.countDown(), 500);
+
+ l.await();
+
+ Queue<Object[]> recorded = connRef.get().recordedMessages();
+
+ assertEquals(2, recorded.size());
+ assertTrue(recorded.poll()[1] instanceof Request1);
+ assertTrue(recorded.poll()[1] instanceof Response1);
+ }
+
+ @Test
+ public void testRecordedAsyncTimeout() throws RemotingException, InterruptedException {
+ AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+ RpcClient client1 = new LocalRpcClient();
+
+ client1.registerConnectEventListener(new ConnectionOpenedEventListener() {
+ @Override public void onOpened(String remoteAddress, Connection conn) {
+ LocalConnection locConn = (LocalConnection) conn;
+ connRef.set(locConn);
+ locConn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+ }
+ });
+
+ assertTrue(client1.checkConnection(endpoint, true));
+
+ Request1 request = new Request1();
+ request.val = 10_000;
+ CountDownLatch l = new CountDownLatch(1);
+
+ AtomicReference<Throwable> holder = new AtomicReference<>();
+
+ client1.invokeAsync(endpoint, request, null, new InvokeCallback() {
+ @Override public void complete(Object result, Throwable err) {
+ holder.set(err);
+
+ l.countDown();
+ }
+ }, 500);
+
+ l.await();
+
+ assertTrue(holder.get() instanceof InvokeTimeoutException);
+
+ Queue<Object[]> recorded = connRef.get().recordedMessages();
+
+ assertEquals(1, recorded.size());
+ assertTrue(recorded.poll()[1] instanceof Request1);
+ }
+
+ @Test
+ public void testBlockedSync() throws RemotingException, InterruptedException {
+// RpcClient client1 = new LocalRpcClient();
+//
+// assertTrue(client1.checkConnection(endpoint, true));
+//
+// LocalConnection conn = LocalRpcServer.servers.get(endpoint).conns.get(client1);
+//
+// assertNotNull(conn);
+//
+// conn.recordMessages(msg -> msg instanceof Request1);
+//
+// Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
+//
+// assertEquals(1, resp2.val);
+//
+// Future<Response1> resp = Utils.runInThread(() -> (Response1) client1.invokeSync(endpoint, new Request1(), 30_000));
+//
+// Thread.sleep(3_000);
+//
+// assertFalse(resp.isDone());
+ }
+
+ private static class Request1RpcProcessor implements RpcProcessor<Request1> {
+ @Override public void handleRequest(RpcContext rpcCtx, Request1 request) {
+ if (request.val == 10_000)
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ // No-op.
+ }
+
+ Response1 resp1 = new Response1();
+ resp1.val = request.val + 1;
+ rpcCtx.sendResponse(resp1);
+ }
+
+ @Override public String interest() {
+ return Request1.class.getName();
+ }
+ }
+
+ private static class Request2RpcProcessor implements RpcProcessor<Request2> {
+ @Override public void handleRequest(RpcContext rpcCtx, Request2 request) {
+ Response2 resp2 = new Response2();
+ resp2.val = request.val + 1;
+ rpcCtx.sendResponse(resp2);
+ }
+
+ @Override public String interest() {
+ return Request2.class.getName();
+ }
+ }
+
+ private static class Request1 implements Message {
+ int val;
+ }
+
+ private static class Request2 implements Message {
+ int val;
+ }
+
+ private static class Response1 implements Message {
+ int val;
+ }
+
+ private static class Response2 implements Message {
+ int val;
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
index db63614..6243a1c 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
@@ -41,7 +41,7 @@ public class LocalConnection implements Connection {
private void send(Message request, Future fut) {
Object[] tuple = {client, request, fut};
- assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue.
+ srv.incoming.offer(tuple);
}
public void onBeforeRequestSend(Message request, Future fut) {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
index db9648a..486971e 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
@@ -191,13 +191,13 @@ public class LocalRpcServer implements RpcServer {
defaultExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("LocalRPCServer-Default-Executor-Thread: " + local.toString()));
+ started = true;
+
worker.setName("LocalRPCServer-Dispatch-Thread: " + local.toString());
worker.start();
servers.put(local, this);
- started = true;
-
return true;
}