You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/07 10:06:09 UTC

[GitHub] [pulsar-client-reactive] cbornet commented on a diff in pull request #83: Fix bugs in InflightLimiter and add tests

cbornet commented on code in PR #83:
URL: https://github.com/apache/pulsar-client-reactive/pull/83#discussion_r1042006311


##########
pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiterTest.java:
##########
@@ -16,50 +16,174 @@
 
 package org.apache.pulsar.reactive.client.internal.api;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
+import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler.Worker;
 import reactor.core.scheduler.Schedulers;
-import reactor.test.StepVerifier;
+import reactor.util.function.Tuple2;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 class InflightLimiterTest {
 
-	@Test
-	void shouldLimitInflight() {
-		List<Integer> values = Collections.synchronizedList(new ArrayList<>());
-		InflightLimiter inflightLimiter = new InflightLimiter(48, 24, Schedulers.single(),
+	@ParameterizedTest
+	@CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+	void shouldNotRequestOrSubscribeMoreThanMaxInflightForMonos(int maxInflight, int maxElements) {
+		InflightLimiter inflightLimiter = new InflightLimiter(maxInflight, maxInflight, Schedulers.single(),
 				InflightLimiter.DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
-		Flux.merge(Arrays.asList(
-				Flux.range(1, 100).publishOn(Schedulers.parallel()).log().as(inflightLimiter::createOperator),
-				Flux.range(101, 100).publishOn(Schedulers.parallel()).log().as(inflightLimiter::createOperator),
-				Flux.range(201, 100).publishOn(Schedulers.parallel()).log().as(inflightLimiter::createOperator)))
-				.as(StepVerifier::create).expectSubscription().recordWith(() -> values).expectNextCount(300)
-				.expectComplete().verify();
-		assertThat(values)
-				.containsExactlyInAnyOrderElementsOf(IntStream.range(1, 301).boxed().collect(Collectors.toList()));
-		// verify "fairness"
-		// TODO: this is flaky, fix it
-		// verifyFairness(values);
+
+		AtomicLong totalRequests = new AtomicLong();
+		AtomicLong requestsMax = new AtomicLong();
+		AtomicInteger subscriptionsActiveBeforeCompletingFirstElement = new AtomicInteger();
+		AtomicInteger subscriptionsMax = new AtomicInteger();
+
+		List<Integer> inputValues = IntStream.rangeClosed(1, maxElements).boxed().collect(Collectors.toList());
+
+		Worker worker = Schedulers.boundedElastic().createWorker();
+		try {
+			Flux<Integer> flux = Flux.fromIterable(inputValues).flatMap((i) -> {
+				AtomicLong currentRequests = new AtomicLong();
+				return Mono.delay(Duration.ofMillis(25L)).thenReturn(i).doOnSubscribe((subscription) -> {
+					worker.schedule(() -> {
+						int currentSubscriptionsCount = subscriptionsActiveBeforeCompletingFirstElement
+								.incrementAndGet();
+						subscriptionsMax.accumulateAndGet(currentSubscriptionsCount, Math::max);
+					});
+				}).doOnRequest((requested) -> {
+					worker.schedule(() -> {
+						currentRequests.addAndGet(requested);
+						long current = totalRequests.addAndGet(requested);
+						requestsMax.accumulateAndGet(current, Math::max);
+					});
+				}).doOnNext((__) -> {
+					worker.schedule(() -> {
+						currentRequests.decrementAndGet();
+						totalRequests.decrementAndGet();

Review Comment:
   Is it needed since we set currentRequests to 0 at L80 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org