You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by wi...@apache.org on 2023/03/31 01:26:48 UTC

[ratis] branch branch-2_readIndex updated (42c3c6ba4 -> 8a226a33b)

This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a change to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git


    from 42c3c6ba4 RATIS-1821. Upgrade ratis-thirdparty version to 1.0.4 (#862)
     new 2b373f6bc RATIS-1822. Disable first election on changeToFollower (#863)
     new 8a226a33b RATIS-1823. Improve error log in StreamObserverWithTimeout. (#864)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/ratis/util/StringUtils.java    | 10 +++-----
 .../{CheckedRunnable.java => StringSupplier.java}  | 28 +++++++++++++---------
 .../grpc/server/GrpcServerProtocolClient.java      |  5 ++--
 .../ratis/grpc/util/StreamObserverWithTimeout.java | 27 +++++++++++++--------
 .../apache/ratis/server/impl/RaftServerImpl.java   |  2 +-
 .../org/apache/ratis/grpc/util/GrpcTestClient.java |  5 +++-
 6 files changed, 45 insertions(+), 32 deletions(-)
 copy ratis-common/src/main/java/org/apache/ratis/util/function/{CheckedRunnable.java => StringSupplier.java} (61%)


[ratis] 02/02: RATIS-1823. Improve error log in StreamObserverWithTimeout. (#864)

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 8a226a33b09c9f2dd098411b830a6d1da7bb664b
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Mar 28 22:57:41 2023 +0800

    RATIS-1823. Improve error log in StreamObserverWithTimeout. (#864)
---
 .../java/org/apache/ratis/util/StringUtils.java    | 10 ++----
 .../apache/ratis/util/function/StringSupplier.java | 42 ++++++++++++++++++++++
 .../grpc/server/GrpcServerProtocolClient.java      |  5 +--
 .../ratis/grpc/util/StreamObserverWithTimeout.java | 27 ++++++++------
 .../org/apache/ratis/grpc/util/GrpcTestClient.java |  5 ++-
 5 files changed, 69 insertions(+), 20 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index db1323f06..68c76ba99 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.util;
 import org.apache.ratis.thirdparty.com.google.common.collect.Interner;
 import org.apache.ratis.thirdparty.com.google.common.collect.Interners;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.function.StringSupplier;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -138,13 +139,8 @@ public final class StringUtils {
     return stm.toString();
   }
 
-  public static Object stringSupplierAsObject(Supplier<String> supplier) {
-    return new Object() {
-      @Override
-      public String toString() {
-        return supplier.get();
-      }
-    };
+  public static StringSupplier stringSupplierAsObject(Supplier<String> supplier) {
+    return StringSupplier.get(supplier);
   }
 
   public static <K, V> String map2String(Map<K, V> map) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java
new file mode 100644
index 000000000..50bbb244e
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+import java.util.function.Supplier;
+
+/** Supplier of {@link String}. */
+@FunctionalInterface
+public interface StringSupplier extends Supplier<String> {
+  /**
+   * @return a {@link StringSupplier} which uses the given {@link Supplier}
+   *         to override both {@link Supplier#get()} and {@link Object#toString()}.
+   */
+  static StringSupplier get(Supplier<String> supplier) {
+    return new StringSupplier() {
+      @Override
+      public String get() {
+        return supplier.get();
+      }
+
+      @Override
+      public String toString() {
+        return supplier.get();
+      }
+    };
+  }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index d1bb70728..34f014ebc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -21,6 +21,7 @@ import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.grpc.util.StreamObserverWithTimeout;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.util.ServerStringUtils;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
@@ -138,8 +139,8 @@ public class GrpcServerProtocolClient implements Closeable {
 
   StreamObserver<InstallSnapshotRequestProto> installSnapshot(
       String name, TimeDuration timeout, int limit, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
-    return StreamObserverWithTimeout.newInstance(name, timeout, limit,
-        i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
+    return StreamObserverWithTimeout.newInstance(name, ServerStringUtils::toInstallSnapshotRequestString,
+        timeout, limit, i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
   }
 
   // short-circuit the backoff timer and make them reconnect immediately.
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
index 8fa30b1cc..723a5dd99 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
@@ -24,6 +24,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.ResourceSemaphore;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
+import org.apache.ratis.util.function.StringSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,8 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
   public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class);
 
   public static <T> StreamObserverWithTimeout<T> newInstance(
-      String name, TimeDuration timeout, int outstandingLimit,
+      String name, Function<T, String> request2String,
+      TimeDuration timeout, int outstandingLimit,
       Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) {
     final AtomicInteger responseCount = new AtomicInteger();
     final ResourceSemaphore semaphore = outstandingLimit > 0? new ResourceSemaphore(outstandingLimit): null;
@@ -46,11 +48,13 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
         semaphore.release();
       }
     });
-    return new StreamObserverWithTimeout<>(
-        name, timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor));
+    return new StreamObserverWithTimeout<>(name, request2String,
+        timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor));
   }
 
   private final String name;
+  private final Function<T, String> requestToStringFunction;
+
   private final TimeDuration timeout;
   private final StreamObserver<T> observer;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
@@ -60,16 +64,18 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
   private final IntSupplier responseCount;
   private final ResourceSemaphore semaphore;
 
-  private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount,
-      ResourceSemaphore semaphore, StreamObserver<T> observer) {
+  private StreamObserverWithTimeout(String name, Function<T, String> requestToStringFunction,
+      TimeDuration timeout, IntSupplier responseCount, ResourceSemaphore semaphore, StreamObserver<T> observer) {
     this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+    this.requestToStringFunction = requestToStringFunction;
+
     this.timeout = timeout;
     this.responseCount = responseCount;
     this.semaphore = semaphore;
     this.observer = observer;
   }
 
-  private void acquire(T request) {
+  private void acquire(StringSupplier request) {
     if (semaphore == null) {
       return;
     }
@@ -88,14 +94,15 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
 
   @Override
   public void onNext(T request) {
-    acquire(request);
+    final StringSupplier requestString = StringSupplier.get(() -> requestToStringFunction.apply(request));
+    acquire(requestString);
     observer.onNext(request);
     final int id = requestCount.incrementAndGet();
-    scheduler.onTimeout(timeout, () -> handleTimeout(id, request),
-        LOG, () -> name + ": Timeout check failed for request: " + request);
+    scheduler.onTimeout(timeout, () -> handleTimeout(id, requestString),
+        LOG, () -> name + ": Timeout check failed for request: " + requestString);
   }
 
-  private void handleTimeout(int id, T request) {
+  private void handleTimeout(int id, StringSupplier request) {
     if (id > responseCount.getAsInt()) {
       onError(new TimeoutIOException(name + ": Timed out " + timeout + " for sending request " + request));
     }
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
index 7434b2d79..130c05eb9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
@@ -26,6 +26,7 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannelBuilder;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +54,9 @@ class GrpcTestClient implements Closeable {
   }
 
   static StreamObserverFactory withTimeout(TimeDuration timeout) {
-    return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout, 2,
+    final String className = JavaUtils.getClassSimpleName(HelloRequest.class) + ":";
+    return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test",
+        r -> className + r.getName(), timeout, 2,
         i -> stub.withInterceptors(i).hello(responseHandler));
   }
 


[ratis] 01/02: RATIS-1822. Disable first election on changeToFollower (#863)

Posted by wi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 2b373f6bc3c278da2994967250e985d83617009a
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Mar 28 21:21:29 2023 +0800

    RATIS-1822. Disable first election on changeToFollower (#863)
---
 .../src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 5ecbc36d4..128900fb5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -545,6 +545,7 @@ class RaftServerImpl implements RaftServer.Division,
         role.shutdownFollowerState();
       }
       role.startFollowerState(this, reason);
+      firstElectionSinceStartup.set(false);
     }
     return metadataUpdated;
   }
@@ -1851,7 +1852,6 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   void onGroupLeaderElected() {
-    this.firstElectionSinceStartup.set(false);
     transferLeadership.complete(TransferLeadership.Result.SUCCESS);
   }
 }