You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/07 12:18:15 UTC

[GitHub] [pulsar-client-reactive] lhotari opened a new pull request, #89: Add maxInflight test to AdaptedReactiveMessageSenderTest

lhotari opened a new pull request, #89:
URL: https://github.com/apache/pulsar-client-reactive/pull/89

   WIP: sendMany doesn't currently work with InflightLimiter
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] cbornet merged pull request #89: Fix issue with max inflight limit in sendMany

Posted by GitBox <gi...@apache.org>.
cbornet merged PR #89:
URL: https://github.com/apache/pulsar-client-reactive/pull/89


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-reactive] cbornet commented on a diff in pull request #89: Fix issue with max inflight limit in sendMany

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #89:
URL: https://github.com/apache/pulsar-client-reactive/pull/89#discussion_r1042339689


##########
pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java:
##########
@@ -318,4 +328,75 @@ void senderCacheEntryRecreatedIfProducerClosed() throws Exception {
 		assertThat(reconnectTimeout).isBetween(Duration.ofSeconds(4), Duration.ofSeconds(5));
 	}
 
+	@Test
+	void maxInFlightUsingSendOne() throws Exception {
+		doTestMaxInFlight((reactiveSender, inputFlux) -> inputFlux
+				.flatMap((i) -> reactiveSender.sendOne(MessageSpec.of(String.valueOf(i))), 100));
+	}
+
+	@Test
+	void maxInFlightUsingSendMany() throws Exception {
+		doTestMaxInFlight((reactiveSender, inputFlux) -> inputFlux.window(3).flatMap(
+				(subFlux) -> subFlux.map((i) -> MessageSpec.of(String.valueOf(i))).as(reactiveSender::sendMany), 100));
+	}
+
+	void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, Flux<Integer>, Flux<MessageId>> sendingFunction)
+			throws Exception {
+		ScheduledExecutorService executorService = null;
+		try {
+			executorService = Executors.newSingleThreadScheduledExecutor();
+			final ScheduledExecutorService finalExecutorService = executorService;
+			PulsarClientImpl pulsarClient = spy(
+					(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
+			AtomicLong totalRequests = new AtomicLong();
+			AtomicLong requestsMax = new AtomicLong();
+			ProducerBase<String> producer = mock(ProducerBase.class);
+			given(producer.closeAsync()).willReturn(CompletableFuture.completedFuture(null));
+			given(producer.isConnected()).willReturn(true);
+			given(producer.newMessage()).willAnswer((__) -> {
+				TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
+						new TypedMessageBuilderImpl<>(producer, Schema.STRING));
+				given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
+					CompletableFuture<MessageId> messageSender = new CompletableFuture<>();
+					finalExecutorService.execute(() -> {
+						long current = totalRequests.incrementAndGet();
+						requestsMax.accumulateAndGet(current, Math::max);
+					});
+					finalExecutorService.schedule(() -> {
+						totalRequests.decrementAndGet();
+						// encode integer in message value to entry id in message id
+						int encodedEntryId = Integer.parseInt(typedMessageBuilder.getMessage().getValue());
+						messageSender.complete(
+								DefaultImplementation.getDefaultImplementation().newMessageId(1, encodedEntryId, 1));
+					}, 5, TimeUnit.MILLISECONDS);
+					return messageSender;
+				});
+				return typedMessageBuilder;
+			});
+
+			given(pulsarClient.createProducerAsync(any(), eq(Schema.STRING), isNull()))
+					.willReturn(CompletableFuture.completedFuture(producer));
+
+			ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+					.messageSender(Schema.STRING).maxInflight(7).cache(AdaptedReactivePulsarClientFactory.createCache())

Review Comment:
   We could test several values of maxInFlight with parameterized tests like what has been done for InflightLimiterTest.
   Can be done in follow-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org