You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by wi...@apache.org on 2023/03/28 14:57:48 UTC
[ratis] branch master updated: RATIS-1823. Improve error log in StreamObserverWithTimeout. (#864)
This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b517002c7 RATIS-1823. Improve error log in StreamObserverWithTimeout. (#864)
b517002c7 is described below
commit b517002c7735dc9fb826914e29f8b015bb6e2281
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Mar 28 22:57:41 2023 +0800
RATIS-1823. Improve error log in StreamObserverWithTimeout. (#864)
---
.../java/org/apache/ratis/util/StringUtils.java | 10 ++----
.../apache/ratis/util/function/StringSupplier.java | 42 ++++++++++++++++++++++
.../grpc/server/GrpcServerProtocolClient.java | 5 +--
.../ratis/grpc/util/StreamObserverWithTimeout.java | 27 ++++++++------
.../org/apache/ratis/grpc/util/GrpcTestClient.java | 5 ++-
5 files changed, 69 insertions(+), 20 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index db1323f06..68c76ba99 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.util;
import org.apache.ratis.thirdparty.com.google.common.collect.Interner;
import org.apache.ratis.thirdparty.com.google.common.collect.Interners;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.function.StringSupplier;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -138,13 +139,8 @@ public final class StringUtils {
return stm.toString();
}
- public static Object stringSupplierAsObject(Supplier<String> supplier) {
- return new Object() {
- @Override
- public String toString() {
- return supplier.get();
- }
- };
+ public static StringSupplier stringSupplierAsObject(Supplier<String> supplier) {
+ return StringSupplier.get(supplier);
}
public static <K, V> String map2String(Map<K, V> map) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java
new file mode 100644
index 000000000..50bbb244e
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+import java.util.function.Supplier;
+
+/** Supplier of {@link String}. */
+@FunctionalInterface
+public interface StringSupplier extends Supplier<String> {
+ /**
+ * @return a {@link StringSupplier} which uses the given {@link Supplier}
+ * to override both {@link Supplier#get()} and {@link Object#toString()}.
+ */
+ static StringSupplier get(Supplier<String> supplier) {
+ return new StringSupplier() {
+ @Override
+ public String get() {
+ return supplier.get();
+ }
+
+ @Override
+ public String toString() {
+ return supplier.get();
+ }
+ };
+ }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index d1bb70728..34f014ebc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -21,6 +21,7 @@ import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.util.StreamObserverWithTimeout;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
@@ -138,8 +139,8 @@ public class GrpcServerProtocolClient implements Closeable {
StreamObserver<InstallSnapshotRequestProto> installSnapshot(
String name, TimeDuration timeout, int limit, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
- return StreamObserverWithTimeout.newInstance(name, timeout, limit,
- i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
+ return StreamObserverWithTimeout.newInstance(name, ServerStringUtils::toInstallSnapshotRequestString,
+ timeout, limit, i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
}
// short-circuit the backoff timer and make them reconnect immediately.
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
index 8fa30b1cc..723a5dd99 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
@@ -24,6 +24,7 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.ResourceSemaphore;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
+import org.apache.ratis.util.function.StringSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,8 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class);
public static <T> StreamObserverWithTimeout<T> newInstance(
- String name, TimeDuration timeout, int outstandingLimit,
+ String name, Function<T, String> request2String,
+ TimeDuration timeout, int outstandingLimit,
Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) {
final AtomicInteger responseCount = new AtomicInteger();
final ResourceSemaphore semaphore = outstandingLimit > 0? new ResourceSemaphore(outstandingLimit): null;
@@ -46,11 +48,13 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
semaphore.release();
}
});
- return new StreamObserverWithTimeout<>(
- name, timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor));
+ return new StreamObserverWithTimeout<>(name, request2String,
+ timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor));
}
private final String name;
+ private final Function<T, String> requestToStringFunction;
+
private final TimeDuration timeout;
private final StreamObserver<T> observer;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
@@ -60,16 +64,18 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
private final IntSupplier responseCount;
private final ResourceSemaphore semaphore;
- private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount,
- ResourceSemaphore semaphore, StreamObserver<T> observer) {
+ private StreamObserverWithTimeout(String name, Function<T, String> requestToStringFunction,
+ TimeDuration timeout, IntSupplier responseCount, ResourceSemaphore semaphore, StreamObserver<T> observer) {
this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+ this.requestToStringFunction = requestToStringFunction;
+
this.timeout = timeout;
this.responseCount = responseCount;
this.semaphore = semaphore;
this.observer = observer;
}
- private void acquire(T request) {
+ private void acquire(StringSupplier request) {
if (semaphore == null) {
return;
}
@@ -88,14 +94,15 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
@Override
public void onNext(T request) {
- acquire(request);
+ final StringSupplier requestString = StringSupplier.get(() -> requestToStringFunction.apply(request));
+ acquire(requestString);
observer.onNext(request);
final int id = requestCount.incrementAndGet();
- scheduler.onTimeout(timeout, () -> handleTimeout(id, request),
- LOG, () -> name + ": Timeout check failed for request: " + request);
+ scheduler.onTimeout(timeout, () -> handleTimeout(id, requestString),
+ LOG, () -> name + ": Timeout check failed for request: " + requestString);
}
- private void handleTimeout(int id, T request) {
+ private void handleTimeout(int id, StringSupplier request) {
if (id > responseCount.getAsInt()) {
onError(new TimeoutIOException(name + ": Timed out " + timeout + " for sending request " + request));
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
index 7434b2d79..130c05eb9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
@@ -26,6 +26,7 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +54,9 @@ class GrpcTestClient implements Closeable {
}
static StreamObserverFactory withTimeout(TimeDuration timeout) {
- return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout, 2,
+ final String className = JavaUtils.getClassSimpleName(HelloRequest.class) + ":";
+ return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test",
+ r -> className + r.getName(), timeout, 2,
i -> stub.withInterceptors(i).hello(responseHandler));
}