You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2020/04/27 11:37:47 UTC

[incubator-ratis] branch master updated: RATIS-884. Support timeout for client blocking api (Netty). Contributed by Burcu Ozkan.

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d5a3e05  RATIS-884. Support timeout for client blocking api (Netty). Contributed by Burcu Ozkan.
d5a3e05 is described below

commit d5a3e05937d6b05b82d023148102de3d7bc9898c
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Mon Apr 27 17:06:45 2020 +0530

    RATIS-884. Support timeout for client blocking api (Netty). Contributed by Burcu Ozkan.
---
 .../java/org/apache/ratis/netty/NettyFactory.java  |  2 +-
 .../java/org/apache/ratis/netty/NettyRpcProxy.java | 21 +++++++++----
 .../apache/ratis/netty/client/NettyClientRpc.java  |  5 ++--
 .../apache/ratis/netty/server/NettyRpcService.java |  2 +-
 .../ratis/server/impl/LeaderElectionTests.java     | 35 ++++++++++++++++++++++
 .../ratis/grpc/TestLeaderElectionWithGrpc.java     |  5 ++++
 .../ratis/netty/TestLeaderElectionWithNetty.java   |  6 ++++
 7 files changed, 67 insertions(+), 9 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index 5e9c2e9..450b95f 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -42,6 +42,6 @@ public class NettyFactory extends ServerFactory.BaseFactory implements ClientFac
 
   @Override
   public NettyClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properties) {
-    return new NettyClientRpc(clientId);
+    return new NettyClientRpc(clientId, properties);
   }
 }
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 1c83629..15e54f4 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -17,7 +17,10 @@
  */
 package org.apache.ratis.netty;
 
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.thirdparty.io.netty.channel.*;
 import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
@@ -31,6 +34,7 @@ import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.PeerProxyMap;
 import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -38,22 +42,25 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY;
 
 public class NettyRpcProxy implements Closeable {
   public static class PeerMap extends PeerProxyMap<NettyRpcProxy> {
     private final EventLoopGroup group = new NioEventLoopGroup();
+    private final RaftProperties properties;
 
-    public PeerMap(String name) {
+    public PeerMap(String name, RaftProperties properties) {
       super(name);
+      this.properties = properties;
     }
 
     @Override
     public NettyRpcProxy createProxyImpl(RaftPeer peer)
-        throws IOException {
+            throws IOException {
       try {
-        return new NettyRpcProxy(peer, group);
+        return new NettyRpcProxy(peer, properties, group);
       } catch (InterruptedException e) {
         throw IOUtils.toInterruptedIOException("Failed connecting to " + peer, e);
       }
@@ -153,10 +160,12 @@ public class NettyRpcProxy implements Closeable {
 
   private final RaftPeer peer;
   private final Connection connection;
+  private final TimeDuration requestTimeoutDuration;
 
-  public NettyRpcProxy(RaftPeer peer, EventLoopGroup group) throws InterruptedException {
+  public NettyRpcProxy(RaftPeer peer, RaftProperties properties, EventLoopGroup group) throws InterruptedException {
     this.peer = peer;
     this.connection = new Connection(group);
+    this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
   }
 
   @Override
@@ -172,12 +181,14 @@ public class NettyRpcProxy implements Closeable {
 
     try {
       channelFuture.sync();
-      return reply.get();
+      return reply.get(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit());
     } catch (InterruptedException e) {
       throw IOUtils.toInterruptedIOException(ProtoUtils.toString(request)
           + " sending from " + peer + " is interrupted.", e);
     } catch (ExecutionException e) {
       throw IOUtils.toIOException(e);
+    } catch (TimeoutException e) {
+      throw new TimeoutIOException(e.getMessage(), e);
     }
   }
 }
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index bc6ebdc..f7655f6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -19,6 +19,7 @@ package org.apache.ratis.netty.client;
 
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.netty.NettyRpcProxy;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.proto.RaftProtos;
@@ -31,8 +32,8 @@ import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
 import java.io.IOException;
 
 public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
-  public NettyClientRpc(ClientId clientId) {
-    super(new NettyRpcProxy.PeerMap(clientId.toString()));
+  public NettyClientRpc(ClientId clientId, RaftProperties properties) {
+    super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
   }
 
   @Override
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 07a6b71..863be84 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -96,7 +96,7 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
 
   /** Constructs a netty server with the given port. */
   private NettyRpcService(RaftServer server) {
-    super(server::getId, id -> new NettyRpcProxy.PeerMap(id.toString()));
+    super(server::getId, id -> new NettyRpcProxy.PeerMap(id.toString(), server.getProperties()));
     this.server = server;
 
     final ChannelInitializer<SocketChannel> initializer
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 814416d..b8076cb 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -23,6 +23,7 @@ import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -152,6 +153,40 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
     Assert.assertEquals(leader.getId(), lastServerLeaderId);
   }
 
+  protected void testDisconnectLeader() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(3)) {
+      cluster.start();
+
+      final RaftServerImpl leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.send(new RaftTestUtil.SimpleMessage("message"));
+        Thread.sleep(1000);
+        isolate(cluster, leader.getId());
+        RaftClientReply reply = client.send(new RaftTestUtil.SimpleMessage("message"));
+        Assert.assertNotEquals(reply.getReplierId(), leader.getId());
+        Assert.assertTrue(reply.isSuccess());
+      } finally {
+        deIsolate(cluster, leader.getId());
+      }
+
+      cluster.shutdown();
+    }
+  }
+
+  private void isolate(MiniRaftCluster cluster, RaftPeerId id) {
+    try {
+      BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
+      cluster.setBlockRequestsFrom(id.toString(), true);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
+    BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
+    cluster.setBlockRequestsFrom(id.toString(), false);
+  }
+
   @Test
   public void testLeaderElectionMetrics() throws IOException, InterruptedException {
     LOG.info("Running testLeaderElectionMetrics");
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
index eb08336..7730cb1 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
@@ -33,4 +33,9 @@ public class TestLeaderElectionWithGrpc
     MiniRaftClusterWithGrpc.sendServerRequestInjection.clear();
     BlockRequestHandlingInjection.getInstance().unblockAll();
   }
+
+  @Test
+  public void testDisconnectLeader() throws Exception {
+    super.testDisconnectLeader();
+  }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestLeaderElectionWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestLeaderElectionWithNetty.java
index 6d40b60..f84bbb7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/netty/TestLeaderElectionWithNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestLeaderElectionWithNetty.java
@@ -32,4 +32,10 @@ public class TestLeaderElectionWithNetty
     MiniRaftClusterWithNetty.sendServerRequest.clear();
     BlockRequestHandlingInjection.getInstance().unblockAll();
   }
+
+  @Test
+  public void testDisconnectLeader() throws Exception {
+    super.testDisconnectLeader();
+  }
+
 }