You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/08/12 12:57:03 UTC

[GitHub] [ignite-3] ibessonov commented on a change in pull request #272: IGNITE-15272 Async raft group service startup

ibessonov commented on a change in pull request #272:
URL: https://github.com/apache/ignite-3/pull/272#discussion_r687680152



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -88,24 +88,26 @@
     private final long retryDelay;
 
     /** */
-    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+    private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Review comment:
       Is this necessary?

##########
File path: modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
##########
@@ -117,6 +122,14 @@ public ConnectionManager(
      */
     public void start() throws IgniteInternalException {
         try {
+            boolean wasStarted = started.getAndSet(true);

Review comment:
       Have we discussed guarantees that start/stop should provide when called in parallel? Right now I can see a race

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -155,16 +187,48 @@ public RaftGroupServiceImpl(
         return learners;
     }
 
-    /** {@inheritDoc} */
-    @Override public CompletableFuture<Void> refreshLeader() {
+    /**
+     * Sends a {@link GetLeaderRequest}.
+     *
+     * @param groupId Raft group id.
+     * @param factory Message factory.
+     * @param cluster Cluster service.
+     * @param peers List of all peers.
+     * @param timeout Timeout.
+     * @param retryDelay Retry delay.
+     * @return Future representing pending completion of the request.
+     */
+    private static CompletableFuture<Peer> getLeader(

Review comment:
       Can you make it not static and invoke it on new instance before completing the future in "start"? Too many arguments

##########
File path: modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
##########
@@ -117,7 +117,7 @@ public void testRefreshLeaderStable() throws Exception {
         mockLeaderRequest(false);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
+            RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY).get();

Review comment:
       Maybe we should add timeouts in tests?

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -155,16 +187,48 @@ public RaftGroupServiceImpl(
         return learners;
     }
 
-    /** {@inheritDoc} */
-    @Override public CompletableFuture<Void> refreshLeader() {
+    /**
+     * Sends a {@link GetLeaderRequest}.
+     *
+     * @param groupId Raft group id.
+     * @param factory Message factory.
+     * @param cluster Cluster service.
+     * @param peers List of all peers.
+     * @param timeout Timeout.
+     * @param retryDelay Retry delay.
+     * @return Future representing pending completion of the request.
+     */
+    private static CompletableFuture<Peer> getLeader(
+        String groupId,
+        RaftClientMessagesFactory factory,
+        ClusterService cluster,
+        List<Peer> peers,
+        long timeout,
+        long retryDelay
+    ) {
         GetLeaderRequest req = factory.getLeaderRequest().groupId(groupId).build();
 
-        CompletableFuture<GetLeaderResponse> fut = new CompletableFuture<>();
-
-        sendWithRetry(randomNode(), req, currentTimeMillis() + timeout, fut);
+        var fut = new CompletableFuture<GetLeaderResponse>();
+
+        sendWithRetry(
+            peers,
+            randomNode(peers),
+            req,
+            currentTimeMillis() + timeout,
+            cluster,
+            timeout,
+            retryDelay,
+            leader -> {},
+            fut
+        );
+
+        return fut.thenApply(GetLeaderResponse::leader);
+    }
 
-        return fut.thenApply(resp -> {
-            leader = resp.leader();
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Void> refreshLeader() {
+        return getLeader(groupId, factory, cluster, peers, timeout, retryDelay).thenApply(leader -> {

Review comment:
       Does "thenAccept" fit better here?

##########
File path: modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
##########
@@ -232,11 +245,15 @@ public void addListener(BiConsumer<SocketAddress, NetworkMessage> listener) {
      * Stops the server and all clients.
      */
     public void stop() {
-         Stream<CompletableFuture<Void>> stream = Stream.concat(
+        boolean wasStopped = this.stopped.getAndSet(true);
+
+        if (wasStopped)
+            return;
+
+        Stream<CompletableFuture<Void>> stream = Stream.concat(
             clients.values().stream().map(NettyClient::stop),
             Stream.of(server.stop())
         );
-

Review comment:
       Can you please put it back?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org