You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2020/04/27 11:37:47 UTC
[incubator-ratis] branch master updated: RATIS-884. Support timeout
for client blocking api (Netty). Contributed by Burcu Ozkan.
This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d5a3e05 RATIS-884. Support timeout for client blocking api (Netty). Contributed by Burcu Ozkan.
d5a3e05 is described below
commit d5a3e05937d6b05b82d023148102de3d7bc9898c
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Mon Apr 27 17:06:45 2020 +0530
RATIS-884. Support timeout for client blocking api (Netty). Contributed by Burcu Ozkan.
---
.../java/org/apache/ratis/netty/NettyFactory.java | 2 +-
.../java/org/apache/ratis/netty/NettyRpcProxy.java | 21 +++++++++----
.../apache/ratis/netty/client/NettyClientRpc.java | 5 ++--
.../apache/ratis/netty/server/NettyRpcService.java | 2 +-
.../ratis/server/impl/LeaderElectionTests.java | 35 ++++++++++++++++++++++
.../ratis/grpc/TestLeaderElectionWithGrpc.java | 5 ++++
.../ratis/netty/TestLeaderElectionWithNetty.java | 6 ++++
7 files changed, 67 insertions(+), 9 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index 5e9c2e9..450b95f 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -42,6 +42,6 @@ public class NettyFactory extends ServerFactory.BaseFactory implements ClientFac
@Override
public NettyClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properties) {
- return new NettyClientRpc(clientId);
+ return new NettyClientRpc(clientId, properties);
}
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 1c83629..15e54f4 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -17,7 +17,10 @@
*/
package org.apache.ratis.netty;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.thirdparty.io.netty.channel.*;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
@@ -31,6 +34,7 @@ import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
import java.io.Closeable;
import java.io.IOException;
@@ -38,22 +42,25 @@ import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import static org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY;
public class NettyRpcProxy implements Closeable {
public static class PeerMap extends PeerProxyMap<NettyRpcProxy> {
private final EventLoopGroup group = new NioEventLoopGroup();
+ private final RaftProperties properties;
- public PeerMap(String name) {
+ public PeerMap(String name, RaftProperties properties) {
super(name);
+ this.properties = properties;
}
@Override
public NettyRpcProxy createProxyImpl(RaftPeer peer)
- throws IOException {
+ throws IOException {
try {
- return new NettyRpcProxy(peer, group);
+ return new NettyRpcProxy(peer, properties, group);
} catch (InterruptedException e) {
throw IOUtils.toInterruptedIOException("Failed connecting to " + peer, e);
}
@@ -153,10 +160,12 @@ public class NettyRpcProxy implements Closeable {
private final RaftPeer peer;
private final Connection connection;
+ private final TimeDuration requestTimeoutDuration;
- public NettyRpcProxy(RaftPeer peer, EventLoopGroup group) throws InterruptedException {
+ public NettyRpcProxy(RaftPeer peer, RaftProperties properties, EventLoopGroup group) throws InterruptedException {
this.peer = peer;
this.connection = new Connection(group);
+ this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
}
@Override
@@ -172,12 +181,14 @@ public class NettyRpcProxy implements Closeable {
try {
channelFuture.sync();
- return reply.get();
+ return reply.get(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit());
} catch (InterruptedException e) {
throw IOUtils.toInterruptedIOException(ProtoUtils.toString(request)
+ " sending from " + peer + " is interrupted.", e);
} catch (ExecutionException e) {
throw IOUtils.toIOException(e);
+ } catch (TimeoutException e) {
+ throw new TimeoutIOException(e.getMessage(), e);
}
}
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index bc6ebdc..f7655f6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -19,6 +19,7 @@ package org.apache.ratis.netty.client;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.netty.NettyRpcProxy;
import org.apache.ratis.protocol.*;
import org.apache.ratis.proto.RaftProtos;
@@ -31,8 +32,8 @@ import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
import java.io.IOException;
public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
- public NettyClientRpc(ClientId clientId) {
- super(new NettyRpcProxy.PeerMap(clientId.toString()));
+ public NettyClientRpc(ClientId clientId, RaftProperties properties) {
+ super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
}
@Override
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 07a6b71..863be84 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -96,7 +96,7 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
/** Constructs a netty server with the given port. */
private NettyRpcService(RaftServer server) {
- super(server::getId, id -> new NettyRpcProxy.PeerMap(id.toString()));
+ super(server::getId, id -> new NettyRpcProxy.PeerMap(id.toString(), server.getProperties()));
this.server = server;
final ChannelInitializer<SocketChannel> initializer
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 814416d..b8076cb 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -23,6 +23,7 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
@@ -152,6 +153,40 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
Assert.assertEquals(leader.getId(), lastServerLeaderId);
}
+ protected void testDisconnectLeader() throws Exception {
+ try(final MiniRaftCluster cluster = newCluster(3)) {
+ cluster.start();
+
+ final RaftServerImpl leader = waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ client.send(new RaftTestUtil.SimpleMessage("message"));
+ Thread.sleep(1000);
+ isolate(cluster, leader.getId());
+ RaftClientReply reply = client.send(new RaftTestUtil.SimpleMessage("message"));
+ Assert.assertNotEquals(reply.getReplierId(), leader.getId());
+ Assert.assertTrue(reply.isSuccess());
+ } finally {
+ deIsolate(cluster, leader.getId());
+ }
+
+ cluster.shutdown();
+ }
+ }
+
+ private void isolate(MiniRaftCluster cluster, RaftPeerId id) {
+ try {
+ BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
+ cluster.setBlockRequestsFrom(id.toString(), true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
+ BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
+ cluster.setBlockRequestsFrom(id.toString(), false);
+ }
+
@Test
public void testLeaderElectionMetrics() throws IOException, InterruptedException {
LOG.info("Running testLeaderElectionMetrics");
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
index eb08336..7730cb1 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
@@ -33,4 +33,9 @@ public class TestLeaderElectionWithGrpc
MiniRaftClusterWithGrpc.sendServerRequestInjection.clear();
BlockRequestHandlingInjection.getInstance().unblockAll();
}
+
+ @Test
+ public void testDisconnectLeader() throws Exception {
+ super.testDisconnectLeader();
+ }
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestLeaderElectionWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestLeaderElectionWithNetty.java
index 6d40b60..f84bbb7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/netty/TestLeaderElectionWithNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestLeaderElectionWithNetty.java
@@ -32,4 +32,10 @@ public class TestLeaderElectionWithNetty
MiniRaftClusterWithNetty.sendServerRequest.clear();
BlockRequestHandlingInjection.getInstance().unblockAll();
}
+
+ @Test
+ public void testDisconnectLeader() throws Exception {
+ super.testDisconnectLeader();
+ }
+
}