You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by wi...@apache.org on 2023/03/31 01:26:50 UTC

[ratis] 02/02: RATIS-1823. Improve error log in StreamObserverWithTimeout. (#864)

This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 8a226a33b09c9f2dd098411b830a6d1da7bb664b
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Mar 28 22:57:41 2023 +0800

    RATIS-1823. Improve error log in StreamObserverWithTimeout. (#864)
---
 .../java/org/apache/ratis/util/StringUtils.java    | 10 ++----
 .../apache/ratis/util/function/StringSupplier.java | 42 ++++++++++++++++++++++
 .../grpc/server/GrpcServerProtocolClient.java      |  5 +--
 .../ratis/grpc/util/StreamObserverWithTimeout.java | 27 ++++++++------
 .../org/apache/ratis/grpc/util/GrpcTestClient.java |  5 ++-
 5 files changed, 69 insertions(+), 20 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index db1323f06..68c76ba99 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.util;
 import org.apache.ratis.thirdparty.com.google.common.collect.Interner;
 import org.apache.ratis.thirdparty.com.google.common.collect.Interners;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.function.StringSupplier;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -138,13 +139,8 @@ public final class StringUtils {
     return stm.toString();
   }
 
-  public static Object stringSupplierAsObject(Supplier<String> supplier) {
-    return new Object() {
-      @Override
-      public String toString() {
-        return supplier.get();
-      }
-    };
+  public static StringSupplier stringSupplierAsObject(Supplier<String> supplier) {
+    return StringSupplier.get(supplier);
   }
 
   public static <K, V> String map2String(Map<K, V> map) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java
new file mode 100644
index 000000000..50bbb244e
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+import java.util.function.Supplier;
+
+/** Supplier of {@link String}. */
+@FunctionalInterface
+public interface StringSupplier extends Supplier<String> {
+  /**
+   * @return a {@link StringSupplier} which uses the given {@link Supplier}
+   *         to override both {@link Supplier#get()} and {@link Object#toString()}.
+   */
+  static StringSupplier get(Supplier<String> supplier) {
+    return new StringSupplier() {
+      @Override
+      public String get() {
+        return supplier.get();
+      }
+
+      @Override
+      public String toString() {
+        return supplier.get();
+      }
+    };
+  }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index d1bb70728..34f014ebc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -21,6 +21,7 @@ import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.grpc.util.StreamObserverWithTimeout;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.util.ServerStringUtils;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
@@ -138,8 +139,8 @@ public class GrpcServerProtocolClient implements Closeable {
 
   StreamObserver<InstallSnapshotRequestProto> installSnapshot(
       String name, TimeDuration timeout, int limit, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
-    return StreamObserverWithTimeout.newInstance(name, timeout, limit,
-        i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
+    return StreamObserverWithTimeout.newInstance(name, ServerStringUtils::toInstallSnapshotRequestString,
+        timeout, limit, i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
   }
 
   // short-circuit the backoff timer and make them reconnect immediately.
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
index 8fa30b1cc..723a5dd99 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
@@ -24,6 +24,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.ResourceSemaphore;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
+import org.apache.ratis.util.function.StringSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,8 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
   public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class);
 
   public static <T> StreamObserverWithTimeout<T> newInstance(
-      String name, TimeDuration timeout, int outstandingLimit,
+      String name, Function<T, String> request2String,
+      TimeDuration timeout, int outstandingLimit,
       Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) {
     final AtomicInteger responseCount = new AtomicInteger();
     final ResourceSemaphore semaphore = outstandingLimit > 0? new ResourceSemaphore(outstandingLimit): null;
@@ -46,11 +48,13 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
         semaphore.release();
       }
     });
-    return new StreamObserverWithTimeout<>(
-        name, timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor));
+    return new StreamObserverWithTimeout<>(name, request2String,
+        timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor));
   }
 
   private final String name;
+  private final Function<T, String> requestToStringFunction;
+
   private final TimeDuration timeout;
   private final StreamObserver<T> observer;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
@@ -60,16 +64,18 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
   private final IntSupplier responseCount;
   private final ResourceSemaphore semaphore;
 
-  private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount,
-      ResourceSemaphore semaphore, StreamObserver<T> observer) {
+  private StreamObserverWithTimeout(String name, Function<T, String> requestToStringFunction,
+      TimeDuration timeout, IntSupplier responseCount, ResourceSemaphore semaphore, StreamObserver<T> observer) {
     this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+    this.requestToStringFunction = requestToStringFunction;
+
     this.timeout = timeout;
     this.responseCount = responseCount;
     this.semaphore = semaphore;
     this.observer = observer;
   }
 
-  private void acquire(T request) {
+  private void acquire(StringSupplier request) {
     if (semaphore == null) {
       return;
     }
@@ -88,14 +94,15 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
 
   @Override
   public void onNext(T request) {
-    acquire(request);
+    final StringSupplier requestString = StringSupplier.get(() -> requestToStringFunction.apply(request));
+    acquire(requestString);
     observer.onNext(request);
     final int id = requestCount.incrementAndGet();
-    scheduler.onTimeout(timeout, () -> handleTimeout(id, request),
-        LOG, () -> name + ": Timeout check failed for request: " + request);
+    scheduler.onTimeout(timeout, () -> handleTimeout(id, requestString),
+        LOG, () -> name + ": Timeout check failed for request: " + requestString);
   }
 
-  private void handleTimeout(int id, T request) {
+  private void handleTimeout(int id, StringSupplier request) {
     if (id > responseCount.getAsInt()) {
       onError(new TimeoutIOException(name + ": Timed out " + timeout + " for sending request " + request));
     }
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
index 7434b2d79..130c05eb9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
@@ -26,6 +26,7 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannelBuilder;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +54,9 @@ class GrpcTestClient implements Closeable {
   }
 
   static StreamObserverFactory withTimeout(TimeDuration timeout) {
-    return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout, 2,
+    final String className = JavaUtils.getClassSimpleName(HelloRequest.class) + ":";
+    return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test",
+        r -> className + r.getName(), timeout, 2,
         i -> stub.withInterceptors(i).hello(responseHandler));
   }