You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/14 10:52:23 UTC

[ratis] branch master updated: RATIS-1812. Enforce a outstanding request limit in StreamObserverWithTimeout. (#850)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 917671cf3 RATIS-1812. Enforce a outstanding request limit in StreamObserverWithTimeout. (#850)
917671cf3 is described below

commit 917671cf35cec5357f078158873d98a13d29157c
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Mar 14 18:52:16 2023 +0800

    RATIS-1812. Enforce a outstanding request limit in StreamObserverWithTimeout. (#850)
---
 .../org/apache/ratis/util/ResourceSemaphore.java   | 28 +++++-----------
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  4 +--
 .../grpc/server/GrpcServerProtocolClient.java      |  4 +--
 .../ratis/grpc/util/StreamObserverWithTimeout.java | 37 +++++++++++++++++++---
 .../apache/ratis/server/impl/PendingRequests.java  | 17 +++++++---
 .../server/leader/InstallSnapshotRequests.java     |  2 +-
 .../org/apache/ratis/grpc/util/GrpcTestClient.java |  2 +-
 .../grpc/util/TestStreamObserverWithTimeout.java   |  3 --
 .../apache/ratis/util/TestResourceSemaphore.java   | 13 ++++----
 9 files changed, 64 insertions(+), 46 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
index b9e0ff5c7..fb75feeaa 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
@@ -88,14 +88,9 @@ public class ResourceSemaphore extends Semaphore {
   /**
    * Track a group of resources with a list of {@link ResourceSemaphore}s.
    */
-
-  public enum ResourceAcquireStatus {
-    SUCCESS,
-    FAILED_IN_ELEMENT_LIMIT,
-    FAILED_IN_BYTE_SIZE_LIMIT
-  }
-
   public static class Group {
+    public static final int SUCCESS = -1;
+
     private final List<ResourceSemaphore> resources;
 
     public Group(int... limits) {
@@ -115,7 +110,8 @@ public class ResourceSemaphore extends Semaphore {
       return resources.get(i);
     }
 
-    public ResourceAcquireStatus tryAcquire(int... permits) {
+    /** @return {@link #SUCCESS} if successfully acquired; otherwise, return the failed index. */
+    public int tryAcquire(int... permits) {
       Preconditions.assertTrue(permits.length == resources.size(),
           () -> "items.length = " + permits.length + " != resources.size() = " + resources.size());
       int i = 0;
@@ -126,24 +122,16 @@ public class ResourceSemaphore extends Semaphore {
         }
       }
 
-
       if (i == permits.length) {
-        return ResourceAcquireStatus.SUCCESS; // successfully acquired all resources
-      }
-
-      ResourceAcquireStatus acquireStatus;
-      if (i == 0) {
-        acquireStatus =  ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT;
-      } else {
-        acquireStatus =  ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT;
+        return SUCCESS; // successfully acquired all resources
       }
 
       // failed at i, releasing all previous resources
-      for(i--; i >= 0; i--) {
-        resources.get(i).release(permits[i]);
+      for(int k = i - 1; k >= 0; k--) {
+        resources.get(k).release(permits[k]);
       }
 
-      return acquireStatus;
+      return i;
     }
 
     public void acquire(int... permits) throws InterruptedException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 1b954d6a7..8fe285a2c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -595,7 +595,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     final String requestId = UUID.randomUUID().toString();
     try {
       snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot",
-          requestTimeoutDuration, responseHandler);
+          requestTimeoutDuration, 8, responseHandler); //FIXME: RATIS-1809
       for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
         if (isRunning()) {
           snapshotRequestObserver.onNext(request);
@@ -646,7 +646,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     }
     try {
       snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-notifyInstallSnapshot",
-          requestTimeoutDuration, responseHandler);
+          requestTimeoutDuration, 0, responseHandler);
 
       snapshotRequestObserver.onNext(request);
       getFollower().updateLastRpcSendTime(false);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index c3f8730e7..d1bb70728 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -137,8 +137,8 @@ public class GrpcServerProtocolClient implements Closeable {
   }
 
   StreamObserver<InstallSnapshotRequestProto> installSnapshot(
-      String name, TimeDuration timeout, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
-    return StreamObserverWithTimeout.newInstance(name, timeout,
+      String name, TimeDuration timeout, int limit, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
+    return StreamObserverWithTimeout.newInstance(name, timeout, limit,
         i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
   }
 
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
index 2b875f3ed..8fa30b1cc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
@@ -21,6 +21,7 @@ import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ResourceSemaphore;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
@@ -34,13 +35,19 @@ import java.util.function.IntSupplier;
 public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
   public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class);
 
-  public static <T> StreamObserverWithTimeout<T> newInstance(String name, TimeDuration timeout,
+  public static <T> StreamObserverWithTimeout<T> newInstance(
+      String name, TimeDuration timeout, int outstandingLimit,
       Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) {
     final AtomicInteger responseCount = new AtomicInteger();
-    final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor(
-        r -> responseCount.getAndIncrement());
+    final ResourceSemaphore semaphore = outstandingLimit > 0? new ResourceSemaphore(outstandingLimit): null;
+    final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor(r -> {
+      responseCount.getAndIncrement();
+      if (semaphore != null) {
+        semaphore.release();
+      }
+    });
     return new StreamObserverWithTimeout<>(
-        name, timeout, responseCount::get, newStreamObserver.apply(interceptor));
+        name, timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor));
   }
 
   private final String name;
@@ -51,17 +58,37 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
   private final AtomicBoolean isClose = new AtomicBoolean();
   private final AtomicInteger requestCount = new AtomicInteger();
   private final IntSupplier responseCount;
+  private final ResourceSemaphore semaphore;
 
   private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount,
-      StreamObserver<T> observer) {
+      ResourceSemaphore semaphore, StreamObserver<T> observer) {
     this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
     this.timeout = timeout;
     this.responseCount = responseCount;
+    this.semaphore = semaphore;
     this.observer = observer;
   }
 
+  private void acquire(T request) {
+    if (semaphore == null) {
+      return;
+    }
+    boolean acquired = false;
+    for (; !acquired && !isClose.get(); ) {
+      try {
+        acquired = semaphore.tryAcquire(timeout.getDuration(), timeout.getUnit());
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("Interrupted onNext " + request, e);
+      }
+    }
+    if (!acquired) {
+      throw new IllegalStateException("Failed onNext " + request + ": already closed.");
+    }
+  }
+
   @Override
   public void onNext(T request) {
+    acquire(request);
     observer.onNext(request);
     final int id = requestCount.incrementAndGet();
     scheduler.onTimeout(timeout, () -> handleTimeout(id, request),
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 0812c29fd..2f2ca8f7d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -61,6 +61,12 @@ class PendingRequests {
 
   static class Permit {}
 
+  /**
+   * The return type of {@link RequestLimits#tryAcquire(int)}.
+   * The order of the enum value must match the order in {@link RequestLimits}.
+   */
+  enum Acquired { FAILED_IN_ELEMENT_LIMIT, FAILED_IN_BYTE_SIZE_LIMIT, SUCCESS }
+
   static class RequestLimits extends ResourceSemaphore.Group {
     RequestLimits(int elementLimit, int megabyteLimit) {
       super(elementLimit, megabyteLimit);
@@ -74,8 +80,9 @@ class PendingRequests {
       return get(1).used();
     }
 
-    ResourceSemaphore.ResourceAcquireStatus tryAcquire(int messageSizeMb) {
-      return tryAcquire(1, messageSizeMb);
+    Acquired tryAcquire(int messageSizeMb) {
+      final int acquired = tryAcquire(1, messageSizeMb);
+      return acquired == SUCCESS? PendingRequests.Acquired.SUCCESS: PendingRequests.Acquired.values()[acquired];
     }
 
     void releaseExtraMb(int extraMb) {
@@ -112,13 +119,13 @@ class PendingRequests {
     Permit tryAcquire(Message message) {
       final int messageSize = Message.getSize(message);
       final int messageSizeMb = roundUpMb(messageSize );
-      final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(messageSizeMb);
+      final Acquired acquired = resource.tryAcquire(messageSizeMb);
       LOG.trace("tryAcquire {} MB? {}", messageSizeMb, acquired);
-      if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT) {
+      if (acquired == Acquired.FAILED_IN_ELEMENT_LIMIT) {
         raftServerMetrics.onRequestQueueLimitHit();
         raftServerMetrics.onResourceLimitHit();
         return null;
-      } else if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT) {
+      } else if (acquired == Acquired.FAILED_IN_BYTE_SIZE_LIMIT) {
         raftServerMetrics.onRequestByteSizeLimitHit();
         raftServerMetrics.onResourceLimitHit();
         return null;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
index f52253b24..cdb6603c2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
@@ -113,7 +113,7 @@ class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> {
   private InstallSnapshotRequestProto nextInstallSnapshotRequestProto() {
     final int numFiles = snapshot.getFiles().size();
     if (fileIndex >= numFiles) {
-      throw new NoSuchElementException();
+      throw new NoSuchElementException("fileIndex = " + fileIndex + " >= numFiles = " + numFiles);
     }
     final FileInfo info = snapshot.getFiles().get(fileIndex);
     try {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
index 0923b27fe..7434b2d79 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
@@ -53,7 +53,7 @@ class GrpcTestClient implements Closeable {
   }
 
   static StreamObserverFactory withTimeout(TimeDuration timeout) {
-    return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout,
+    return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout, 2,
         i -> stub.withInterceptors(i).hello(responseHandler));
   }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
index dac58812d..1439f9b9d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
@@ -92,9 +92,6 @@ public class TestStreamObserverWithTimeout extends BaseTest {
 
         final List<CompletableFuture<String>> futures = new ArrayList<>();
         for (String m : messages) {
-          if (type == Type.WithTimeout) {
-            timeout.sleep();
-          }
           futures.add(client.send(m));
         }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
index d085161cf..6fe1aed7e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
@@ -24,17 +24,17 @@ import org.junit.Test;
 
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT;
-import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT;
-import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.SUCCESS;
+import static org.apache.ratis.util.ResourceSemaphore.Group.SUCCESS;
 
 public class TestResourceSemaphore extends BaseTest {
   @Test(timeout = 5000)
   public void testGroup() throws InterruptedException, TimeoutException {
+    final int FAILED_IN_ELEMENT_LIMIT = 0;
+    final int FAILED_IN_BYTE_SIZE_LIMIT = 1;
     final ResourceSemaphore.Group g = new ResourceSemaphore.Group(3, 1);
 
     assertUsed(g, 0, 0);
-    assertAcquire(g, ResourceSemaphore.ResourceAcquireStatus.SUCCESS, 1, 1);
+    assertAcquire(g, SUCCESS, 1, 1);
     assertUsed(g, 1, 1);
     assertAcquire(g, FAILED_IN_BYTE_SIZE_LIMIT, 1, 1);
     assertUsed(g, 1, 1);
@@ -86,9 +86,8 @@ public class TestResourceSemaphore extends BaseTest {
     }
   }
 
-  static void assertAcquire(ResourceSemaphore.Group g, ResourceSemaphore.ResourceAcquireStatus expected,
-      int... permits) {
-    final ResourceSemaphore.ResourceAcquireStatus computed = g.tryAcquire(permits);
+  static void assertAcquire(ResourceSemaphore.Group g, int expected, int... permits) {
+    final int computed = g.tryAcquire(permits);
     Assert.assertEquals(expected, computed);
   }