You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/27 17:18:03 UTC

[GitHub] [ignite-3] sashapolo commented on a change in pull request #147: IGNITE-14567

sashapolo commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640795892



##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
##########
@@ -90,22 +91,31 @@ void fireEvent(Message message) {
             .withData(msg)
             .correlationId(correlationId)
             .build();
+
         return cluster
             .send(clusterNodeAddress(recipient), message)
             .toFuture();
     }
 
     /** {@inheritDoc} */
-    @Override public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
+    @Override public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, final NetworkMessage msg, long timeout) {

Review comment:
       ```suggestion
       @Override public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
   ```

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -287,67 +324,124 @@ public RaftGroupServiceImpl(
         if (leader == null)
             return refreshLeader().thenCompose(res -> run(cmd));
 
-        ActionRequest<R> req = factory.actionRequest().command(cmd).groupId(groupId).build();
+        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
+
+        CompletableFuture<ActionResponse<R>> fut = new CompletableFuture<>();
 
-        CompletableFuture<ActionResponse<R>> fut = sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
+        sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
 
         return fut.thenApply(resp -> resp.result());
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
-        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).build();
+        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
+
+        CompletableFuture fut = cluster.messagingService().invoke(peer.address(), req, timeout);
 
-        CompletableFuture<?> fut = cluster.messagingService().invoke(peer.getNode(), req, timeout);
+        return fut.thenApply(resp -> ((ActionResponse) resp).result());
+    }
 
-        return fut.thenApply(resp -> ((ActionResponse<R>) resp).result());
+    /** {@inheritDoc} */
+    @Override public void shutdown() {
+        if (!reuse)
+            cluster.shutdown();
     }
 
-    private <R> CompletableFuture<R> sendWithRetry(ClusterNode node, NetworkMessage req, long stopTime) {
-        if (currentTimeMillis() >= stopTime)
-            return CompletableFuture.failedFuture(new TimeoutException());
-        return cluster.messagingService().invoke(node, req, timeout)
-            .thenCompose(resp -> {
-                if (resp instanceof RaftErrorResponse) {
-                    RaftErrorResponse resp0 = (RaftErrorResponse)resp;
-                    switch (resp0.errorCode()) {
-                        case NO_LEADER:
-                            return composeWithDelay(() -> sendWithRetry(randomNode(), req, stopTime));
-                        case LEADER_CHANGED:
-                            leader = resp0.newLeader();
-                            return composeWithDelay(() -> sendWithRetry(resp0.newLeader().getNode(), req, stopTime));
-                        case SUCCESS:
-                            return CompletableFuture.completedFuture(null);
-                        default:
-                            return CompletableFuture.failedFuture(new RaftException(resp0.errorCode()));
+    /**
+     * Retries request until success or timeout.
+     *
+     * @param addr Target address.
+     * @param req The request.
+     * @param stopTime Stop time.
+     * @param fut The future.
+     * @param <R> Response type.
+     */
+    private <R> void sendWithRetry(Peer peer, Object req, long stopTime, CompletableFuture<R> fut) {

Review comment:
       Why did you change the previous approach for implementing this method? I think it looked more elegant and readable. It is possible to implement the current logic in a similar way. Here's what I as able to come with:
   
   ```
   private <R> CompletableFuture<R> sendWithRetry(Peer peer, Object req, long stopTime) {
       if (currentTimeMillis() >= stopTime) {
           return CompletableFuture.failedFuture(new TimeoutException());
       }
   
       return cluster.messagingService().invoke(peer.address(), (NetworkMessage)req, timeout)
           .handle((resp, err) -> {
               if (err != null) {
                   if (recoverable(err)) {
                       return composeWithDelay(() -> this.<R>sendWithRetry(randomNode(), req, stopTime));
                   }
                   else {
                       return CompletableFuture.<R>failedFuture(err);
                   }
               }
               else if (resp instanceof RaftErrorResponse) {
                   RaftErrorResponse resp0 = (RaftErrorResponse)resp;
                   RaftErrorCode errorCode = resp0.errorCode();
   
                   if (errorCode == null) {
                       leader = peer;
   
                       return CompletableFuture.<R>completedFuture(null);
                   }
   
                   switch (errorCode) {
                       case NO_LEADER:
                           return composeWithDelay(() -> this.<R>sendWithRetry(randomNode(), req, stopTime));
                       case LEADER_CHANGED:
                           leader = resp0.newLeader(); // Update a leader.
   
                           return composeWithDelay(() -> this.<R>sendWithRetry(resp0.newLeader(), req, stopTime));
                       default:
                           return CompletableFuture.<R>failedFuture(new RaftException(errorCode));
                   }
               }
               else {
                   leader = peer;
   
                   return CompletableFuture.completedFuture((R)resp);
               }
           })
           .thenCompose(Function.identity());
   }
   ```
   
   What do you think?

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
##########
@@ -35,19 +35,30 @@
      */
     Command command();
 
+    /**
+     * @return {@code True} for linearizable reading.
+     */
+    boolean readOnlySafe();

Review comment:
       This getter is actually never used, only the setter. Can you also elaborate please, where does the `readOnlySafe` concept comes from? I don't understand how is it going to be used for linearization....

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java
##########
@@ -17,10 +17,13 @@
 
 package org.apache.ignite.raft.client.service;
 
+import java.io.Serializable;
 import org.apache.ignite.raft.client.Command;
 
 /**
- * A closure to notify abbout command processing outcome.
+ * A closure to notify about command processing outcome.

Review comment:
       ```suggestion
    * A closure to notify about a command processing outcome.
   ```

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -261,9 +286,21 @@ public RaftGroupServiceImpl(
     @Override public CompletableFuture<Void> snapshot(Peer peer) {
         SnapshotRequest req = factory.snapshotRequest().groupId(groupId).build();
 
-        CompletableFuture<?> fut = cluster.messagingService().invoke(peer.getNode(), req, timeout);
+        // Disable the timeout for a snapshot request.
+        CompletableFuture<NetworkMessage> fut = cluster.messagingService().invoke(peer.address(), req, Integer.MAX_VALUE);
 
-        return fut.thenApply(resp -> null);
+        return fut.handle(new BiFunction<NetworkMessage, Throwable, Void>() {
+            @Override public Void apply(NetworkMessage resp, Throwable throwable) {

Review comment:
       why do you ignore the `throwable` that might be passed to this method? Should we use `thenApply` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org