You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/27 11:23:47 UTC

[ignite-3] 01/04: IGNITE-14149 Cleanup.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 092bc39b09739a5171b11316cec0c96615dfb13b
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Sat Feb 27 12:32:06 2021 +0300

    IGNITE-14149 Cleanup.
---
 .../main/java/org/apache/ignite/raft/PeerId.java   |   4 +
 .../main/java/org/apache/ignite/raft/State.java    | 265 +--------------------
 .../raft/client/RaftClientCommonMessages.java      |   2 +-
 .../RaftClientCommonMessageBuilderFactory.java     |   2 -
 .../ignite/raft/client/rpc/RaftGroupRpcClient.java |   6 +-
 .../client/rpc/impl/RaftGroupRpcClientImpl.java    | 117 +++++----
 .../service/RaftGroupClientRequestListener.java    |   2 +-
 .../service/RaftGroupClientRequestService.java     |   7 +
 .../impl/RaftGroupManagementServiceImpl.java       |   8 +-
 .../java/org/apache/ignite/raft/rpc/NodeImpl.java  |   2 +-
 .../apache/ignite/raft/rpc/RaftGroupMessage.java   |   3 +
 .../java/org/apache/ignite/raft/rpc/RpcClient.java |   8 +-
 .../ignite/raft/client/RaftGroupRpcClientTest.java | 116 ++++++++-
 13 files changed, 212 insertions(+), 330 deletions(-)

diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
index 56d96be..7b5fec0 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
@@ -75,4 +75,8 @@ public class PeerId implements Serializable {
         result = 31 * result + priority;
         return result;
     }
+
+    @Override public String toString() {
+        return node.id() + ":" + priority;
+    }
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
index 5f5fb15..5001d74 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
@@ -16,271 +16,16 @@
  */
 package org.apache.ignite.raft;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Raft group state.
  */
-public class State implements Iterable<PeerId> {
-    /** Mark a leaner peer */
-    private static final String LEARNER_POSTFIX = "/learner";
+public interface State  {
+    @Nullable PeerId leader();
 
-    private volatile PeerId leader;
+    @Nullable List<PeerId> peers();
 
-    private List<PeerId> peers = new ArrayList<>();
-
-    // use LinkedHashSet to keep insertion order.
-    private List<PeerId> learners = new ArrayList<>();
-
-    public State() {
-        super();
-    }
-
-    /**
-     * Construct a configuration instance with peers.
-     *
-     * @param conf configuration
-     */
-    public State(final Iterable<PeerId> conf) {
-        this(conf, null);
-    }
-
-    /**
-     * Construct a configuration from another conf.
-     *
-     * @param conf configuration
-     */
-    public State(final State conf) {
-        this(conf.getPeers(), conf.getLearners());
-    }
-
-    /**
-     * Construct a Configuration instance with peers and learners.
-     *
-     * @param conf     peers configuration
-     * @param learners learners
-     * @since 1.3.0
-     */
-    public State(final Iterable<PeerId> conf, final Iterable<PeerId> learners) {
-        for (final PeerId peer : conf) {
-            this.peers.add(new PeerId(peer));
-        }
-        addLearners(learners);
-    }
-
-    public PeerId getLeader() {
-        return leader;
-    }
-
-    public void setLeader(PeerId leader) {
-        this.leader = leader;
-    }
-
-    public void setLearners(final List<PeerId> learners) {
-        this.learners = learners;
-    }
-
-    /**
-     * Add a learner peer.
-     *
-     * @param learner learner to add
-     * @return true when add successfully.
-     */
-    public boolean addLearner(final PeerId learner) {
-        return this.learners.add(learner);
-    }
-
-    /**
-     * Add learners in batch, returns the added count.
-     *
-     * @param learners learners to add
-     * @return the total added count
-     */
-    public int addLearners(final Iterable<PeerId> learners) {
-        int ret = 0;
-        if (learners != null) {
-            for (final PeerId peer : learners) {
-                if (this.learners.add(new PeerId(peer))) {
-                    ret++;
-                }
-            }
-        }
-        return ret;
-    }
-
-    /**
-     * Remove a learner peer.
-     *
-     * @param learner learner to remove
-     * @return true when remove successfully.
-     */
-    public boolean removeLearner(final PeerId learner) {
-        return this.learners.remove(learner);
-    }
-
-    /**
-     * Retrieve the learners set.
-     *
-     * @return learners
-     */
-    public List<PeerId> getLearners() {
-        return this.learners;
-    }
-
-    /**
-     * Retrieve the learners set copy.
-     *
-     * @return learners
-     */
-    public List<PeerId> listLearners() {
-        return new ArrayList<>(this.learners);
-    }
-
-    /**
-     * Returns true when the configuration is valid.
-     *
-     * @return true if the configuration is valid.
-     */
-    public boolean isValid() {
-        final Set<PeerId> intersection = new HashSet<>(this.peers);
-        intersection.retainAll(this.learners);
-        return !this.peers.isEmpty() && intersection.isEmpty();
-    }
-
-    public void reset() {
-        this.peers.clear();
-        this.learners.clear();
-    }
-
-    public boolean isEmpty() {
-        return this.peers.isEmpty();
-    }
-
-    /**
-     * Returns the peers total number.
-     *
-     * @return total num of peers
-     */
-    public int size() {
-        return this.peers.size();
-    }
-
-    @Override
-    public Iterator<PeerId> iterator() {
-        return this.peers.iterator();
-    }
-
-    public Set<PeerId> getPeerSet() {
-        return new HashSet<>(this.peers);
-    }
-
-    public List<PeerId> listPeers() {
-        return new ArrayList<>(this.peers);
-    }
-
-    public List<PeerId> getPeers() {
-        return this.peers;
-    }
-
-    public void setPeers(final List<PeerId> peers) {
-        this.peers.clear();
-        for (final PeerId peer : peers) {
-            this.peers.add(new PeerId(peer));
-        }
-    }
-
-    public void appendPeers(final Collection<PeerId> set) {
-        this.peers.addAll(set);
-    }
-
-    public boolean addPeer(final PeerId peer) {
-        return this.peers.add(peer);
-    }
-
-    public boolean removePeer(final PeerId peer) {
-        return this.peers.remove(peer);
-    }
-
-    public boolean contains(final PeerId peer) {
-        return this.peers.contains(peer);
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((this.learners == null) ? 0 : this.learners.hashCode());
-        result = prime * result + ((this.peers == null) ? 0 : this.peers.hashCode());
-        return result;
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        State other = (State) obj;
-        if (this.learners == null) {
-            if (other.learners != null) {
-                return false;
-            }
-        } else if (!this.learners.equals(other.learners)) {
-            return false;
-        }
-        if (this.peers == null) {
-            return other.peers == null;
-        } else {
-            return this.peers.equals(other.peers);
-        }
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        final List<PeerId> peers = listPeers();
-        int i = 0;
-        int size = peers.size();
-        for (final PeerId peer : peers) {
-            sb.append(peer);
-            if (i < size - 1 || !this.learners.isEmpty()) {
-                sb.append(",");
-            }
-            i++;
-        }
-
-        size = this.learners.size();
-        i = 0;
-        for (final PeerId peer : this.learners) {
-            sb.append(peer).append(LEARNER_POSTFIX);
-            if (i < size - 1) {
-                sb.append(",");
-            }
-            i++;
-        }
-
-        return sb.toString();
-    }
-
-    /**
-     * Get the difference between |*this| and |rhs|
-     * |included| would be assigned to |*this| - |rhs|
-     * |excluded| would be assigned to |rhs| - |*this|
-     */
-    public void diff(final State rhs, final State included, final State excluded) {
-        included.peers = new ArrayList<>(this.peers);
-        included.peers.removeAll(rhs.peers);
-        excluded.peers = new ArrayList<>(rhs.peers);
-        excluded.peers.removeAll(this.peers);
-    }
+    @Nullable List<PeerId> learners();
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
index 30b1cb5..5a40dc6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
@@ -257,7 +257,7 @@ public final class RaftClientCommonMessages {
         }
     }
 
-    public interface UserRequest<T> extends Message {
+    public interface UserRequest<T> extends RaftGroupMessage {
         T request();
 
         public interface Builder<T> {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
index 45f070a..440cee2 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
@@ -6,8 +6,6 @@ import org.apache.ignite.raft.client.RaftClientCommonMessages;
  * Raft client message builders factory.
  */
 public class RaftClientCommonMessageBuilderFactory implements ClientMessageBuilderFactory {
-    public static RaftClientCommonMessageBuilderFactory DEFAULT = new RaftClientCommonMessageBuilderFactory();
-
     @Override public RaftClientCommonMessages.PingRequest.Builder createPingRequest() {
         return new PingRequestImpl();
     }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
index 986fd71..5f657eb 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
@@ -46,12 +46,12 @@ import static org.apache.ignite.raft.client.RaftClientCommonMessages.TransferLea
 public interface RaftGroupRpcClient {
     /**
      * @param groupId Group id.
-     * @return Current group state or null if state is not yet initalized.
+     * @return Group state snapshot.
      */
-    @Nullable State state(String groupId);
+    State state(String groupId);
 
     /**
-     * Refreshes a state of initialized group.
+     * Refreshes a group leader.
      * @param groupId Group id.
      * @return A future.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
index d7dc4c8..f0ceda6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
@@ -1,6 +1,7 @@
 package org.apache.ignite.raft.client.rpc.impl;
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -8,20 +9,25 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
-import java.util.function.Function;
 import org.apache.ignite.raft.PeerId;
 import org.apache.ignite.raft.RaftException;
 import org.apache.ignite.raft.State;
 import org.apache.ignite.raft.client.RaftClientCommonMessages;
+import org.apache.ignite.raft.client.RaftClientCommonMessages.GetLeaderResponse;
 import org.apache.ignite.raft.client.RaftClientCommonMessages.StatusResponse;
+import org.apache.ignite.raft.client.message.ClientMessageBuilderFactory;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
-import org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory;
 import org.apache.ignite.raft.rpc.InvokeCallback;
 import org.apache.ignite.raft.rpc.Message;
 import org.apache.ignite.raft.rpc.Node;
 import org.apache.ignite.raft.rpc.RaftGroupMessage;
 import org.apache.ignite.raft.rpc.RpcClient;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory.DEFAULT;
 
 public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
     private final ExecutorService executor;
@@ -31,7 +37,10 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
     /** Where to ask for initial configuration. */
     private final Set<Node> initialCfgNodes;
 
-    private Map<String, State> states = new ConcurrentHashMap<>();
+    /** */
+    private final ClientMessageBuilderFactory factory;
+
+    private Map<String, StateImpl> states = new ConcurrentHashMap<>();
 
     /**
      * Accepts dependencies in constructor.
@@ -39,17 +48,22 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
      * @param defaultTimeout
      * @param initialCfgNode Initial configuration nodes.
      */
-    public RaftGroupRpcClientImpl(RpcClient rpcClient, int defaultTimeout, Set<Node> initialCfgNodes) {
+    public RaftGroupRpcClientImpl(RpcClient rpcClient, ClientMessageBuilderFactory factory, int defaultTimeout, Set<Node> initialCfgNodes) {
         this.defaultTimeout = defaultTimeout;
         this.rpcClient = rpcClient;
+        this.factory = factory;
         this.initialCfgNodes = new HashSet<>(initialCfgNodes);
         executor = Executors.newWorkStealingPool();
     }
 
     @Override public State state(String groupId) {
-        State newState = new State();
+        return getState(groupId);
+    }
+
+    private StateImpl getState(String groupId) {
+        StateImpl newState = new StateImpl();
 
-        State state = states.putIfAbsent(groupId, newState);
+        StateImpl state = states.putIfAbsent(groupId, newState);
 
         if (state == null)
             state = newState;
@@ -58,28 +72,16 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
     }
 
     @Override public CompletableFuture<PeerId> refreshLeader(String groupId) {
-        State state = state(groupId);
+        StateImpl state = getState(groupId);
 
-        if (state.getLeader() == null) {
-            synchronized (state) {
-                PeerId leader = state.getLeader();
+        return refreshLeader(initialCfgNodes.iterator().next(), groupId).
+            thenApply(resp -> {
+                PeerId leaderId = resp.getLeaderId();
 
-                if (leader == null) {
-                    return refreshLeader(initialCfgNodes.iterator().next(), groupId).
-                        thenApply(new Function<RaftClientCommonMessages.GetLeaderResponse, PeerId>() {
-                            @Override public PeerId apply(RaftClientCommonMessages.GetLeaderResponse getLeaderResponse) {
-                                PeerId leaderId = getLeaderResponse.getLeaderId();
+                state.leader = leaderId;
 
-                                state.setLeader(leaderId);
-
-                                return leaderId;
-                            }
-                        });
-                }
-            }
-        }
-
-        return CompletableFuture.completedFuture(state.getLeader());
+                return leaderId;
+            });
     }
 
     @Override public CompletableFuture<RaftClientCommonMessages.AddPeerResponse> addPeer(RaftClientCommonMessages.AddPeerRequest request) {
@@ -118,21 +120,34 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
         return null;
     }
 
-    private CompletableFuture<RaftClientCommonMessages.GetLeaderResponse> refreshLeader(Node node, String groupId) {
-        RaftClientCommonMessages.GetLeaderRequest req = RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderRequest().setGroupId(groupId).build();
+    private CompletableFuture<GetLeaderResponse> refreshLeader(Node node, String groupId) {
+        StateImpl state = getState(groupId);
 
-        CompletableFuture<RaftClientCommonMessages.GetLeaderResponse> fut = new CompletableFuture<>();
+        while(true) {
+            CompletableFuture<GetLeaderResponse> fut = state.updateFutRef.get();
 
-        rpcClient.invokeAsync(node, req, new InvokeCallback<RaftClientCommonMessages.GetLeaderResponse>() {
-            @Override public void complete(RaftClientCommonMessages.GetLeaderResponse response, Throwable err) {
-                if (err != null)
-                    fut.completeExceptionally(err);
-                else
-                    fut.complete(response);
-            }
-        }, executor, defaultTimeout);
+            if (fut != null)
+                return fut;
 
-        return fut;
+            if (state.updateFutRef.compareAndSet(null, (fut = new CompletableFuture<>()))) {
+                RaftClientCommonMessages.GetLeaderRequest req = DEFAULT.createGetLeaderRequest().setGroupId(groupId).build();
+
+                CompletableFuture<GetLeaderResponse> finalFut = fut;
+
+                rpcClient.invokeAsync(node, req, new InvokeCallback<GetLeaderResponse>() {
+                    @Override public void complete(GetLeaderResponse response, Throwable err) {
+                        if (err != null)
+                            finalFut.completeExceptionally(err);
+                        else
+                            finalFut.complete(response);
+
+                        state.updateFutRef.set(null);
+                    }
+                }, executor, defaultTimeout);
+
+                return fut;
+            }
+        }
     }
 
     @Override public <R extends Message> CompletableFuture<R> sendCustom(RaftGroupMessage request) {
@@ -142,13 +157,13 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
 
         fut.orTimeout(defaultTimeout, TimeUnit.MILLISECONDS);
 
-        CompletableFuture<PeerId> fut0 = refreshLeader(request.getGroupId());
+        CompletableFuture<PeerId> fut0 = state.leader() == null ?
+            refreshLeader(request.getGroupId()) : completedFuture(state.leader());
 
-        // TODO implement clean chaining.
         fut0.whenComplete(new BiConsumer<PeerId, Throwable>() {
             @Override public void accept(PeerId peerId, Throwable error) {
                 if (error == null) {
-                    rpcClient.invokeAsync(state.getLeader().getNode(), request, new InvokeCallback<R>() {
+                    rpcClient.invokeAsync(peerId.getNode(), request, new InvokeCallback<R>() {
                         @Override public void complete(R response, Throwable err) {
                             if (err != null)
                                 fut.completeExceptionally(err);
@@ -175,4 +190,26 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
 
         return fut;
     }
+
+    private static class StateImpl implements State {
+        private volatile PeerId leader;
+
+        private volatile List<PeerId> peers;
+
+        private volatile List<PeerId> learners;
+
+        private AtomicReference<CompletableFuture<GetLeaderResponse>> updateFutRef = new AtomicReference<>();
+
+        @Override public @Nullable PeerId leader() {
+            return leader;
+        }
+
+        @Override public @Nullable List<PeerId> peers() {
+            return peers;
+        }
+
+        @Override public @Nullable List<PeerId> learners() {
+            return learners;
+        }
+    }
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestListener.java
index 3649a41..c007086 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestListener.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestListener.java
@@ -3,7 +3,7 @@ package org.apache.ignite.raft.client.service;
 import java.util.Iterator;
 
 /**
- * A listener for raft group clien requests.
+ * A listener for raft group client requests.
  */
 public interface RaftGroupClientRequestListener {
     void onReads(Iterator iterator);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
index 9fdc19e..a771bc9 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
@@ -6,5 +6,12 @@ import java.util.concurrent.CompletableFuture;
  *
  */
 public interface RaftGroupClientRequestService {
+    /**
+     * Submits a custom request to a raft group leader. If a leader is not known yet, will try to resolve the leader.
+     * @param request
+     * @param <T> Request.
+     * @param <R> Response.
+     * @return A future.
+     */
     <T, R> CompletableFuture<R> submit(T request);
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
index ff3749f..a70c1fb 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
@@ -3,10 +3,8 @@ package org.apache.ignite.raft.client.service.impl;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.State;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
 import org.apache.ignite.raft.client.service.RaftGroupManagmentService;
-import org.apache.ignite.raft.rpc.Node;
 import org.jetbrains.annotations.Nullable;
 
 public class RaftGroupManagementServiceImpl implements RaftGroupManagmentService {
@@ -17,15 +15,15 @@ public class RaftGroupManagementServiceImpl implements RaftGroupManagmentService
     }
 
     @Override public @Nullable PeerId getLeader(String groupId) {
-        return rpcClient.state(groupId).getLeader();
+        return rpcClient.state(groupId).leader();
     }
 
     @Override public @Nullable List<PeerId> getPeers(String groupId) {
-        return rpcClient.state(groupId).getPeers();
+        return rpcClient.state(groupId).peers();
     }
 
     @Override public @Nullable List<PeerId> getLearners(String groupId) {
-        return rpcClient.state(groupId).getLearners();
+        return rpcClient.state(groupId).learners();
     }
 
     @Override public CompletableFuture<PeersChangeState> addPeer(PeerId peerId) {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
index f71424f..ea6f10c 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
@@ -29,7 +29,7 @@ public class NodeImpl implements Node {
     }
 
     @Override public String id() {
-        return null;
+        return id;
     }
 
     @Override public boolean equals(Object o) {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
index 0ee4561..97502ce 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
@@ -1,5 +1,8 @@
 package org.apache.ignite.raft.rpc;
 
+/**
+ * A message for a specific raft group.
+ */
 public interface RaftGroupMessage extends Message {
     public String getGroupId();
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
index cdb6db7..4231dae 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
@@ -28,7 +28,7 @@ public interface RpcClient {
      * @param node target address
      * @return true if there is a connection and the connection is active and writable.
      */
-    boolean checkConnection(final NodeImpl node);
+    boolean checkConnection(final Node node);
 
     /**
      * Check connection for given address and async to create a new one if there is no connection.
@@ -37,14 +37,14 @@ public interface RpcClient {
      * @param createIfAbsent create a new one if there is no connection
      * @return true if there is a connection and the connection is active and writable.
      */
-    boolean checkConnection(final NodeImpl node, final boolean createIfAbsent);
+    boolean checkConnection(final Node node, final boolean createIfAbsent);
 
     /**
      * Close all connections of a address.
      *
      * @param node target address
      */
-    void closeConnection(final NodeImpl node);
+    void closeConnection(final Node node);
 
     /**
      * Asynchronous invocation with a callback.
@@ -53,7 +53,7 @@ public interface RpcClient {
      * @param request   Request object
      * @param callback  Invoke callback.
      * @param executor  Executor to run invoke callback.
-     * @param timeoutMs Timeout millisecond
+     * @param timeoutMs Timeout millisecond.
      */
     void invokeAsync(final Node node, final Message request, InvokeCallback callback, Executor executor, final long timeoutMs);
 }
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
index e7012e2..d755e86 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
@@ -1,10 +1,14 @@
 package org.apache.ignite.raft.client;
 
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.ignite.raft.PeerId;
 import org.apache.ignite.raft.State;
+import org.apache.ignite.raft.client.RaftClientCommonMessages.GetLeaderRequest;
 import org.apache.ignite.raft.client.rpc.impl.RaftGroupRpcClientImpl;
 import org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
@@ -15,6 +19,7 @@ import org.apache.ignite.raft.rpc.RaftGroupMessage;
 import org.apache.ignite.raft.rpc.RpcClient;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -23,6 +28,7 @@ import org.mockito.stubbing.Answer;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
@@ -35,25 +41,104 @@ public class RaftGroupRpcClientTest {
     private static PeerId leader = new PeerId(new NodeImpl("test"));
 
     @Test
+    public void testRefreshLeader() throws Exception {
+        String groupId = "test";
+
+        mockLeaderRequest(false);
+
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+            5_000, Collections.singleton(leader.getNode()));
+
+        PeerId leaderId = client.refreshLeader(groupId).get();
+
+        assertEquals(leader, client.state(groupId).leader());
+        assertEquals(leader, leaderId);
+    }
+
+    @Test
+    public void testRefreshLeaderMultithreaded() throws Exception {
+        String groupId = "test";
+
+        mockLeaderRequest(false);
+
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+            5_000, Collections.singleton(leader.getNode()));
+
+        int cnt = 20;
+
+        Thread[] runners = new Thread[cnt];
+
+        CountDownLatch l = new CountDownLatch(cnt);
+
+        for (int i = 0; i < runners.length; i++) {
+            runners[i] = new Thread(new Runnable() {
+                @Override public void run() {
+                    l.countDown();
+                    try {
+                        l.await();
+                    }
+                    catch (InterruptedException e) {
+                        // Ignored.
+                    }
+
+                    try {
+                        PeerId leaderId = client.refreshLeader(groupId).get();
+
+                        assertEquals(leader, client.state(groupId).leader());
+                        assertEquals(leader, leaderId);
+                    }
+                    catch (Exception e) {
+                        fail(e);
+                    }
+                }
+            });
+            runners[i].setName("Executor-" + i);
+            runners[i].start();
+        }
+
+        for (int i = 0; i < runners.length; i++)
+            runners[i].join();
+    }
+
+    @Test
+    public void testRefreshLeaderTimeout() throws Exception {
+        String groupId = "test";
+
+        mockLeaderRequest(true);
+
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+            5_000, Collections.singleton(leader.getNode()));
+
+        try {
+            client.refreshLeader(groupId).get();
+
+            fail();
+        }
+        catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+    }
+
+    @Test
     public void testCustomMessage() throws Exception {
         String groupId = "test";
 
-        mockClient();
+        mockLeaderRequest(false);
+        mockCustomRequest();
 
-        // TODO FIXME asch where to get initial configuration for the group ?
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, 5_000, Collections.singleton(leader.getNode()));
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+            5_000, Collections.singleton(leader.getNode()));
 
         JunkRequest req = new JunkRequest(groupId);
 
-        Future<Message> fut = client.sendCustom(req);
+        Message resp = client.sendCustom(req).get();
 
         State state = client.state(groupId);
 
         // Expecting raft group state to be transparently loaded on first request.
-        // TODO FIXME broken
-        assertEquals(leader, state.getLeader());
+        assertEquals(leader, state.leader());
 
-        assertTrue(fut.get() instanceof JunkResponse);
+        assertTrue(resp instanceof JunkResponse);
     }
 
     private static class JunkRequest implements RaftGroupMessage {
@@ -70,8 +155,7 @@ public class RaftGroupRpcClientTest {
 
     private static class JunkResponse implements Message {}
 
-    private void mockClient() {
-        // Mock junk request.
+    private void mockCustomRequest() {
         Mockito.doAnswer(new Answer() {
             @Override public Object answer(InvocationOnMock invocation) throws Throwable {
                 InvokeCallback callback = invocation.getArgument(2);
@@ -82,17 +166,23 @@ public class RaftGroupRpcClientTest {
                 return null;
             }
         }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(JunkRequest.class), any(), any(), anyLong());
+    }
 
-        // Mock get leader request.
+    private void mockLeaderRequest(boolean timeout) {
         Mockito.doAnswer(new Answer() {
             @Override public Object answer(InvocationOnMock invocation) throws Throwable {
                 InvokeCallback callback = invocation.getArgument(2);
                 Executor executor = invocation.getArgument(3);
 
-                executor.execute(() -> callback.complete(RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderResponse().setLeaderId(leader).build(), null));
+                executor.execute(() -> {
+                    if (timeout)
+                        callback.complete(null, new TimeoutException());
+                    else
+                        callback.complete(RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderResponse().setLeaderId(leader).build(), null);
+                });
 
                 return null;
             }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(RaftClientCommonMessages.GetLeaderRequest.class), any(), any(), anyLong());
+        }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(GetLeaderRequest.class), any(), any(), anyLong());
     }
 }