You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/03/23 10:23:57 UTC
incubator-ratis git commit: RATIS-215. Support timeout for async
calls (Grpc). Contributed by Lokesh Jain
Repository: incubator-ratis
Updated Branches:
refs/heads/master b47ba4266 -> 288ea4f42
RATIS-215. Support timeout for async calls (Grpc). Contributed by Lokesh Jain
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/288ea4f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/288ea4f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/288ea4f4
Branch: refs/heads/master
Commit: 288ea4f427f3670ebcbe8027587fbcf2a4d56986
Parents: b47ba42
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri Mar 23 18:23:16 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri Mar 23 18:23:16 2018 +0800
----------------------------------------------------------------------
.../java/org/apache/ratis/rpc/RpcTimeout.java | 55 ++++++++++++++++++++
.../grpc/client/RaftClientProtocolClient.java | 29 ++++++++---
.../java/org/apache/ratis/RaftAsyncTests.java | 13 +++++
.../java/org/apache/ratis/RaftBasicTests.java | 2 +-
4 files changed, 90 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/288ea4f4/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
new file mode 100644
index 0000000..6c2c7fc
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.rpc;
+
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class RpcTimeout {
+ private ScheduledExecutorService timeoutScheduler = null;
+ private TimeDuration callTimeout;
+
+ public RpcTimeout(TimeDuration callTimeout, boolean initialize) {
+ this.callTimeout = callTimeout;
+ if (initialize) {
+ initialize();
+ }
+ }
+
+ public synchronized void initialize() {
+ timeoutScheduler = Executors.newScheduledThreadPool(1);
+ }
+
+ public void onTimeout(Runnable task) {
+ Preconditions.assertTrue(timeoutScheduler != null);
+ TimeUnit unit = callTimeout.getUnit();
+ timeoutScheduler.schedule(task, callTimeout.toInt(unit), unit);
+ }
+
+ public TimeDuration getCallTimeout() {
+ return callTimeout;
+ }
+
+ public void close() {
+ timeoutScheduler.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/288ea4f4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index feb5cc5..a39e19c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc.client;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.grpc.RaftGrpcUtil;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.rpc.RpcTimeout;
import org.apache.ratis.shaded.io.grpc.ManagedChannel;
import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
@@ -41,10 +42,12 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.function.Supplier;
public class RaftClientProtocolClient implements Closeable {
@@ -136,14 +139,11 @@ public class RaftClientProtocolClient implements Closeable {
class AsyncStreamObservers implements Closeable {
/** Request map: callId -> future */
private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
+ private final RpcTimeout
+ rpcTimeout = new RpcTimeout(timeout, true);
private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
@Override
public void onNext(RaftClientReplyProto proto) {
- final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
- if (map == null) {
- LOG.warn("replyStreamObserver onNext map == null");
- return;
- }
final long callId = proto.getRpcReply().getCallId();
try {
final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto);
@@ -152,9 +152,9 @@ public class RaftClientProtocolClient implements Closeable {
completeReplyExceptionally(nle, NotLeaderException.class.getName());
return;
}
- map.remove(callId).complete(reply);
+ handleReplyFuture(callId, f -> f.complete(reply));
} catch (Throwable t) {
- map.get(callId).completeExceptionally(t);
+ handleReplyFuture(callId, f -> f.completeExceptionally(t));
}
}
@@ -181,16 +181,29 @@ public class RaftClientProtocolClient implements Closeable {
() -> getName() + ":" + getClass().getSimpleName());
try {
requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
+ rpcTimeout.onTimeout(() -> timeoutCheck(request));
} catch(Throwable t) {
- f.completeExceptionally(t);
+ handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
}
return f;
}
+ private void timeoutCheck(RaftClientRequest request) {
+ handleReplyFuture(request.getCallId(),
+ f -> f.completeExceptionally(new IOException("Request timeout " + rpcTimeout.getCallTimeout() + ": " + request)));
+ }
+
+ private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
+ Optional.ofNullable(replies.get())
+ .map(replyMap -> replyMap.remove(callId))
+ .ifPresent(handler);
+ }
+
@Override
public void close() {
requestStreamObserver.onCompleted();
completeReplyExceptionally(null, "close");
+ rpcTimeout.close();
}
private void completeReplyExceptionally(Throwable t, String event) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/288ea4f4/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index a1835c0..af61aca 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -23,6 +23,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
@@ -33,6 +34,7 @@ import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.TimeDuration;
import org.junit.*;
import java.io.IOException;
@@ -61,6 +63,8 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
properties = new RaftProperties();
properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
+ TimeDuration retryCacheExpiryDuration = TimeDuration.valueOf(5, TimeUnit.SECONDS);
+ RaftServerConfigKeys.RetryCache.setExpiryTime(properties, retryCacheExpiryDuration);
}
@Test
@@ -243,4 +247,13 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
Assert.assertTrue(reply.isSuccess());
return reply.getMessage().getContent();
}
+
+ @Test
+ public void testRequestTimeout()
+ throws IOException, InterruptedException, ExecutionException {
+ final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+ cluster.start();
+ RaftBasicTests.testRequestTimeout(true, cluster, LOG, properties);
+ cluster.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/288ea4f4/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 8aa05cd..b0980f4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -61,7 +61,7 @@ public abstract class RaftBasicTests extends BaseTest {
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
RaftServerConfigKeys.RetryCache.setExpiryTime(properties, TimeDuration
- .valueOf(10, TimeUnit.SECONDS));
+ .valueOf(5, TimeUnit.SECONDS));
}
public static final int NUM_SERVERS = 5;