You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/03/23 10:23:57 UTC

incubator-ratis git commit: RATIS-215. Support timeout for async calls (Grpc). Contributed by Lokesh Jain

Repository: incubator-ratis
Updated Branches:
  refs/heads/master b47ba4266 -> 288ea4f42


RATIS-215. Support timeout for async calls (Grpc).  Contributed by Lokesh Jain


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/288ea4f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/288ea4f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/288ea4f4

Branch: refs/heads/master
Commit: 288ea4f427f3670ebcbe8027587fbcf2a4d56986
Parents: b47ba42
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri Mar 23 18:23:16 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri Mar 23 18:23:16 2018 +0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/rpc/RpcTimeout.java   | 55 ++++++++++++++++++++
 .../grpc/client/RaftClientProtocolClient.java   | 29 ++++++++---
 .../java/org/apache/ratis/RaftAsyncTests.java   | 13 +++++
 .../java/org/apache/ratis/RaftBasicTests.java   |  2 +-
 4 files changed, 90 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/288ea4f4/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
new file mode 100644
index 0000000..6c2c7fc
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.rpc;
+
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class RpcTimeout {
+  private ScheduledExecutorService timeoutScheduler = null;
+  private TimeDuration callTimeout;
+
+  public RpcTimeout(TimeDuration callTimeout, boolean initialize) {
+    this.callTimeout = callTimeout;
+    if (initialize) {
+      initialize();
+    }
+  }
+
+  public synchronized void initialize() {
+    timeoutScheduler = Executors.newScheduledThreadPool(1);
+  }
+
+  public void onTimeout(Runnable task) {
+    Preconditions.assertTrue(timeoutScheduler != null);
+    TimeUnit unit = callTimeout.getUnit();
+    timeoutScheduler.schedule(task, callTimeout.toInt(unit), unit);
+  }
+
+  public TimeDuration getCallTimeout() {
+    return callTimeout;
+  }
+
+  public void close() {
+    timeoutScheduler.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/288ea4f4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index feb5cc5..a39e19c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc.client;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.rpc.RpcTimeout;
 import org.apache.ratis.shaded.io.grpc.ManagedChannel;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
 import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
@@ -41,10 +42,12 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 public class RaftClientProtocolClient implements Closeable {
@@ -136,14 +139,11 @@ public class RaftClientProtocolClient implements Closeable {
   class AsyncStreamObservers implements Closeable {
     /** Request map: callId -> future */
     private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
+    private final RpcTimeout
+        rpcTimeout = new RpcTimeout(timeout, true);
     private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
       @Override
       public void onNext(RaftClientReplyProto proto) {
-        final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
-        if (map == null) {
-          LOG.warn("replyStreamObserver onNext map == null");
-          return;
-        }
         final long callId = proto.getRpcReply().getCallId();
         try {
           final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto);
@@ -152,9 +152,9 @@ public class RaftClientProtocolClient implements Closeable {
             completeReplyExceptionally(nle, NotLeaderException.class.getName());
             return;
           }
-          map.remove(callId).complete(reply);
+          handleReplyFuture(callId, f -> f.complete(reply));
         } catch (Throwable t) {
-          map.get(callId).completeExceptionally(t);
+          handleReplyFuture(callId, f -> f.completeExceptionally(t));
         }
       }
 
@@ -181,16 +181,29 @@ public class RaftClientProtocolClient implements Closeable {
           () -> getName() + ":" + getClass().getSimpleName());
       try {
         requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
+        rpcTimeout.onTimeout(() -> timeoutCheck(request));
       } catch(Throwable t) {
-        f.completeExceptionally(t);
+        handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
       }
       return f;
     }
 
+    private void timeoutCheck(RaftClientRequest request) {
+      handleReplyFuture(request.getCallId(),
+          f -> f.completeExceptionally(new IOException("Request timeout " + rpcTimeout.getCallTimeout() + ": " + request)));
+    }
+
+    private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
+      Optional.ofNullable(replies.get())
+          .map(replyMap -> replyMap.remove(callId))
+          .ifPresent(handler);
+    }
+
     @Override
     public void close() {
       requestStreamObserver.onCompleted();
       completeReplyExceptionally(null, "close");
+      rpcTimeout.close();
     }
 
     private void completeReplyExceptionally(Throwable t, String event) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/288ea4f4/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index a1835c0..af61aca 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -23,6 +23,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
@@ -33,6 +34,7 @@ import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.*;
 
 import java.io.IOException;
@@ -61,6 +63,8 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     properties = new RaftProperties();
     properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         SimpleStateMachine4Testing.class, StateMachine.class);
+    TimeDuration retryCacheExpiryDuration = TimeDuration.valueOf(5, TimeUnit.SECONDS);
+    RaftServerConfigKeys.RetryCache.setExpiryTime(properties, retryCacheExpiryDuration);
   }
 
   @Test
@@ -243,4 +247,13 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     Assert.assertTrue(reply.isSuccess());
     return reply.getMessage().getContent();
   }
+
+  @Test
+  public void testRequestTimeout()
+      throws IOException, InterruptedException, ExecutionException {
+    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+    cluster.start();
+    RaftBasicTests.testRequestTimeout(true, cluster, LOG, properties);
+    cluster.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/288ea4f4/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 8aa05cd..b0980f4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -61,7 +61,7 @@ public abstract class RaftBasicTests extends BaseTest {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
     RaftServerConfigKeys.RetryCache.setExpiryTime(properties, TimeDuration
-        .valueOf(10, TimeUnit.SECONDS));
+        .valueOf(5, TimeUnit.SECONDS));
   }
 
   public static final int NUM_SERVERS = 5;