You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ratis.apache.org by "Tsz Wo Nicholas Sze (JIRA)" <ji...@apache.org> on 2017/11/06 23:22:00 UTC
[jira] [Commented] (RATIS-113) Add Async send interface to
RaftClient
[ https://issues.apache.org/jira/browse/RATIS-113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241168#comment-16241168 ]
Tsz Wo Nicholas Sze commented on RATIS-113:
-------------------------------------------
Thanks a lot! This is a difficult patch. Some comments:
- RaftClientRpc.sendRequestAsync should not throw IOException. The exception should be set to the returned CompletableFuture.
-* Also, add default implementation as follows.
{code}
default CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest request) {
throw new UnsupportedOperationException(
getClass().getSimpleName() + " does not support sendRequestAsync");
}
{code}
- In GrpcClientRpc, let's do not handle ReinitializeRequest, SetConfigurationRequest and ServerInformatonRequest in sendRequestAsync. We assume that the request in sendRequestAsync must be a normal RaftClientRequest.
- In RaftClientImpl, sendRequestWithRetryAsync calls sendRequest which is not async. It should calls sendRequestWithRetryAsync again; see the following code.
{code}
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync(
Supplier<RaftClientRequest> supplier) {
return sendRequestAsync(supplier.get()).thenComposeAsync(reply -> {
final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
if (reply == null || reply.shouldRetry()) {
final TimeUnit unit = retryInterval.getUnit();
scheduler.schedule(() -> sendRequestWithRetryAsync(supplier).thenApply(r -> f.complete(r)),
retryInterval.toLong(unit), unit);
} else {
f.complete(reply);
}
return f;
});
}
{code}
-* setServerId (and the change in RaftClientMessage) below is not needed since supplier.get() will generate a new request.
{code}
@@ -205,6 +287,7 @@ final class RaftClientImpl implements RaftClient {
if (newLeader != null && oldLeader.equals(leaderId)) {
LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, newLeader);
this.leaderId = newLeader;
+ request.setServerId(newLeader);
}
{code}
- The change in RaftClientReply is not needed since it should retry only if reply == null.
- Add similar tests as testBasicAppendEntries() and testWithLoad() in RaftBasicTests. After that, TestRaftClient may not be needed.
- Please revert the white space change in RaftClientImpl (e.g. @@ -106,8 +126,8 @@, @@ -115,8 +135,8 @@), HadoopClientRpc and NettyClientRpc.
> Add Async send interface to RaftClient
> --------------------------------------
>
> Key: RATIS-113
> URL: https://issues.apache.org/jira/browse/RATIS-113
> Project: Ratis
> Issue Type: Bug
> Reporter: Mukul Kumar Singh
> Assignee: Lokesh Jain
> Attachments: RATIS-113.001.patch
>
>
> Raft Client currently only has a sync interface, an sync interface is needed for ozone
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)