You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/14 10:52:23 UTC
[ratis] branch master updated: RATIS-1812. Enforce a outstanding request limit in StreamObserverWithTimeout. (#850)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 917671cf3 RATIS-1812. Enforce a outstanding request limit in StreamObserverWithTimeout. (#850)
917671cf3 is described below
commit 917671cf35cec5357f078158873d98a13d29157c
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Mar 14 18:52:16 2023 +0800
RATIS-1812. Enforce a outstanding request limit in StreamObserverWithTimeout. (#850)
---
.../org/apache/ratis/util/ResourceSemaphore.java | 28 +++++-----------
.../apache/ratis/grpc/server/GrpcLogAppender.java | 4 +--
.../grpc/server/GrpcServerProtocolClient.java | 4 +--
.../ratis/grpc/util/StreamObserverWithTimeout.java | 37 +++++++++++++++++++---
.../apache/ratis/server/impl/PendingRequests.java | 17 +++++++---
.../server/leader/InstallSnapshotRequests.java | 2 +-
.../org/apache/ratis/grpc/util/GrpcTestClient.java | 2 +-
.../grpc/util/TestStreamObserverWithTimeout.java | 3 --
.../apache/ratis/util/TestResourceSemaphore.java | 13 ++++----
9 files changed, 64 insertions(+), 46 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
index b9e0ff5c7..fb75feeaa 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
@@ -88,14 +88,9 @@ public class ResourceSemaphore extends Semaphore {
/**
* Track a group of resources with a list of {@link ResourceSemaphore}s.
*/
-
- public enum ResourceAcquireStatus {
- SUCCESS,
- FAILED_IN_ELEMENT_LIMIT,
- FAILED_IN_BYTE_SIZE_LIMIT
- }
-
public static class Group {
+ public static final int SUCCESS = -1;
+
private final List<ResourceSemaphore> resources;
public Group(int... limits) {
@@ -115,7 +110,8 @@ public class ResourceSemaphore extends Semaphore {
return resources.get(i);
}
- public ResourceAcquireStatus tryAcquire(int... permits) {
+ /** @return {@link #SUCCESS} if successfully acquired; otherwise, return the failed index. */
+ public int tryAcquire(int... permits) {
Preconditions.assertTrue(permits.length == resources.size(),
() -> "items.length = " + permits.length + " != resources.size() = " + resources.size());
int i = 0;
@@ -126,24 +122,16 @@ public class ResourceSemaphore extends Semaphore {
}
}
-
if (i == permits.length) {
- return ResourceAcquireStatus.SUCCESS; // successfully acquired all resources
- }
-
- ResourceAcquireStatus acquireStatus;
- if (i == 0) {
- acquireStatus = ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT;
- } else {
- acquireStatus = ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT;
+ return SUCCESS; // successfully acquired all resources
}
// failed at i, releasing all previous resources
- for(i--; i >= 0; i--) {
- resources.get(i).release(permits[i]);
+ for(int k = i - 1; k >= 0; k--) {
+ resources.get(k).release(permits[k]);
}
- return acquireStatus;
+ return i;
}
public void acquire(int... permits) throws InterruptedException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 1b954d6a7..8fe285a2c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -595,7 +595,7 @@ public class GrpcLogAppender extends LogAppenderBase {
final String requestId = UUID.randomUUID().toString();
try {
snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot",
- requestTimeoutDuration, responseHandler);
+ requestTimeoutDuration, 8, responseHandler); //FIXME: RATIS-1809
for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
if (isRunning()) {
snapshotRequestObserver.onNext(request);
@@ -646,7 +646,7 @@ public class GrpcLogAppender extends LogAppenderBase {
}
try {
snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-notifyInstallSnapshot",
- requestTimeoutDuration, responseHandler);
+ requestTimeoutDuration, 0, responseHandler);
snapshotRequestObserver.onNext(request);
getFollower().updateLastRpcSendTime(false);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index c3f8730e7..d1bb70728 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -137,8 +137,8 @@ public class GrpcServerProtocolClient implements Closeable {
}
StreamObserver<InstallSnapshotRequestProto> installSnapshot(
- String name, TimeDuration timeout, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
- return StreamObserverWithTimeout.newInstance(name, timeout,
+ String name, TimeDuration timeout, int limit, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
+ return StreamObserverWithTimeout.newInstance(name, timeout, limit,
i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
index 2b875f3ed..8fa30b1cc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
@@ -21,6 +21,7 @@ import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ResourceSemaphore;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
@@ -34,13 +35,19 @@ import java.util.function.IntSupplier;
public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class);
- public static <T> StreamObserverWithTimeout<T> newInstance(String name, TimeDuration timeout,
+ public static <T> StreamObserverWithTimeout<T> newInstance(
+ String name, TimeDuration timeout, int outstandingLimit,
Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) {
final AtomicInteger responseCount = new AtomicInteger();
- final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor(
- r -> responseCount.getAndIncrement());
+ final ResourceSemaphore semaphore = outstandingLimit > 0? new ResourceSemaphore(outstandingLimit): null;
+ final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor(r -> {
+ responseCount.getAndIncrement();
+ if (semaphore != null) {
+ semaphore.release();
+ }
+ });
return new StreamObserverWithTimeout<>(
- name, timeout, responseCount::get, newStreamObserver.apply(interceptor));
+ name, timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor));
}
private final String name;
@@ -51,17 +58,37 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
private final AtomicBoolean isClose = new AtomicBoolean();
private final AtomicInteger requestCount = new AtomicInteger();
private final IntSupplier responseCount;
+ private final ResourceSemaphore semaphore;
private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount,
- StreamObserver<T> observer) {
+ ResourceSemaphore semaphore, StreamObserver<T> observer) {
this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
this.timeout = timeout;
this.responseCount = responseCount;
+ this.semaphore = semaphore;
this.observer = observer;
}
+ private void acquire(T request) {
+ if (semaphore == null) {
+ return;
+ }
+ boolean acquired = false;
+ for (; !acquired && !isClose.get(); ) {
+ try {
+ acquired = semaphore.tryAcquire(timeout.getDuration(), timeout.getUnit());
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Interrupted onNext " + request, e);
+ }
+ }
+ if (!acquired) {
+ throw new IllegalStateException("Failed onNext " + request + ": already closed.");
+ }
+ }
+
@Override
public void onNext(T request) {
+ acquire(request);
observer.onNext(request);
final int id = requestCount.incrementAndGet();
scheduler.onTimeout(timeout, () -> handleTimeout(id, request),
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 0812c29fd..2f2ca8f7d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -61,6 +61,12 @@ class PendingRequests {
static class Permit {}
+ /**
+ * The return type of {@link RequestLimits#tryAcquire(int)}.
+ * The order of the enum value must match the order in {@link RequestLimits}.
+ */
+ enum Acquired { FAILED_IN_ELEMENT_LIMIT, FAILED_IN_BYTE_SIZE_LIMIT, SUCCESS }
+
static class RequestLimits extends ResourceSemaphore.Group {
RequestLimits(int elementLimit, int megabyteLimit) {
super(elementLimit, megabyteLimit);
@@ -74,8 +80,9 @@ class PendingRequests {
return get(1).used();
}
- ResourceSemaphore.ResourceAcquireStatus tryAcquire(int messageSizeMb) {
- return tryAcquire(1, messageSizeMb);
+ Acquired tryAcquire(int messageSizeMb) {
+ final int acquired = tryAcquire(1, messageSizeMb);
+ return acquired == SUCCESS? PendingRequests.Acquired.SUCCESS: PendingRequests.Acquired.values()[acquired];
}
void releaseExtraMb(int extraMb) {
@@ -112,13 +119,13 @@ class PendingRequests {
Permit tryAcquire(Message message) {
final int messageSize = Message.getSize(message);
final int messageSizeMb = roundUpMb(messageSize );
- final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(messageSizeMb);
+ final Acquired acquired = resource.tryAcquire(messageSizeMb);
LOG.trace("tryAcquire {} MB? {}", messageSizeMb, acquired);
- if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT) {
+ if (acquired == Acquired.FAILED_IN_ELEMENT_LIMIT) {
raftServerMetrics.onRequestQueueLimitHit();
raftServerMetrics.onResourceLimitHit();
return null;
- } else if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT) {
+ } else if (acquired == Acquired.FAILED_IN_BYTE_SIZE_LIMIT) {
raftServerMetrics.onRequestByteSizeLimitHit();
raftServerMetrics.onResourceLimitHit();
return null;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
index f52253b24..cdb6603c2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
@@ -113,7 +113,7 @@ class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> {
private InstallSnapshotRequestProto nextInstallSnapshotRequestProto() {
final int numFiles = snapshot.getFiles().size();
if (fileIndex >= numFiles) {
- throw new NoSuchElementException();
+ throw new NoSuchElementException("fileIndex = " + fileIndex + " >= numFiles = " + numFiles);
}
final FileInfo info = snapshot.getFiles().get(fileIndex);
try {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
index 0923b27fe..7434b2d79 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
@@ -53,7 +53,7 @@ class GrpcTestClient implements Closeable {
}
static StreamObserverFactory withTimeout(TimeDuration timeout) {
- return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout,
+ return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout, 2,
i -> stub.withInterceptors(i).hello(responseHandler));
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
index dac58812d..1439f9b9d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
@@ -92,9 +92,6 @@ public class TestStreamObserverWithTimeout extends BaseTest {
final List<CompletableFuture<String>> futures = new ArrayList<>();
for (String m : messages) {
- if (type == Type.WithTimeout) {
- timeout.sleep();
- }
futures.add(client.send(m));
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
index d085161cf..6fe1aed7e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
@@ -24,17 +24,17 @@ import org.junit.Test;
import java.util.concurrent.TimeoutException;
-import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT;
-import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT;
-import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.SUCCESS;
+import static org.apache.ratis.util.ResourceSemaphore.Group.SUCCESS;
public class TestResourceSemaphore extends BaseTest {
@Test(timeout = 5000)
public void testGroup() throws InterruptedException, TimeoutException {
+ final int FAILED_IN_ELEMENT_LIMIT = 0;
+ final int FAILED_IN_BYTE_SIZE_LIMIT = 1;
final ResourceSemaphore.Group g = new ResourceSemaphore.Group(3, 1);
assertUsed(g, 0, 0);
- assertAcquire(g, ResourceSemaphore.ResourceAcquireStatus.SUCCESS, 1, 1);
+ assertAcquire(g, SUCCESS, 1, 1);
assertUsed(g, 1, 1);
assertAcquire(g, FAILED_IN_BYTE_SIZE_LIMIT, 1, 1);
assertUsed(g, 1, 1);
@@ -86,9 +86,8 @@ public class TestResourceSemaphore extends BaseTest {
}
}
- static void assertAcquire(ResourceSemaphore.Group g, ResourceSemaphore.ResourceAcquireStatus expected,
- int... permits) {
- final ResourceSemaphore.ResourceAcquireStatus computed = g.tryAcquire(permits);
+ static void assertAcquire(ResourceSemaphore.Group g, int expected, int... permits) {
+ final int computed = g.tryAcquire(permits);
Assert.assertEquals(expected, computed);
}