You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/12/02 05:35:09 UTC

[pulsar-client-reactive] branch main updated: Add tests for AdaptedReactiveMessageSender (#75)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 68f7ad9  Add tests for AdaptedReactiveMessageSender (#75)
68f7ad9 is described below

commit 68f7ad9fb88cb373c2fe463be1bdb31e73b72192
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Dec 2 06:35:05 2022 +0100

    Add tests for AdaptedReactiveMessageSender (#75)
---
 .../adapter/AdaptedReactiveMessageSenderTest.java  | 178 +++++++++++++++++++++
 1 file changed, 178 insertions(+)

diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
new file mode 100644
index 0000000..8ab6d60
--- /dev/null
+++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.internal.adapter;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBase;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
+import org.apache.pulsar.reactive.client.api.MessageSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
+import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
+import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link AdaptedReactiveMessageSender}.
+ *
+ * @author Christophe Bornet
+ */
+class AdaptedReactiveMessageSenderTest {
+
+	@Test
+	void sendOne() throws Exception {
+		MessageRouter messageRouter = new MessageRouter() {
+		};
+		BatcherBuilder batcherBuilder = () -> null;
+
+		PulsarClientImpl pulsarClient = spy(
+				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
+
+		ProducerBase<String> producer = mock(ProducerBase.class);
+		doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+		TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
+				new TypedMessageBuilderImpl<>(producer, Schema.STRING));
+		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
+
+		doReturn(typedMessageBuilder).when(producer).newMessage();
+
+		CryptoKeyReader cryptoKeyReader = mock(CryptoKeyReader.class);
+
+		ProducerConfigurationData expectedProducerConf = new ProducerConfigurationData();
+		expectedProducerConf.setTopicName("my-topic");
+		expectedProducerConf.setProducerName("my-producer");
+		expectedProducerConf.setSendTimeoutMs(TimeUnit.SECONDS.toMillis(1));
+		expectedProducerConf.setMaxPendingMessages(2);
+		expectedProducerConf.setMaxPendingMessagesAcrossPartitions(3);
+		expectedProducerConf.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+		expectedProducerConf.setHashingScheme(HashingScheme.Murmur3_32Hash);
+		expectedProducerConf.setCryptoFailureAction(ProducerCryptoFailureAction.SEND);
+		expectedProducerConf.setCustomMessageRouter(messageRouter);
+		expectedProducerConf.setBatchingMaxPublishDelayMicros(TimeUnit.SECONDS.toMicros(4));
+		expectedProducerConf.setBatchingPartitionSwitchFrequencyByPublishDelay(5);
+		expectedProducerConf.setBatchingMaxMessages(6);
+		expectedProducerConf.setBatchingMaxBytes(7);
+		expectedProducerConf.setBatchingEnabled(false);
+		expectedProducerConf.setBatcherBuilder(batcherBuilder);
+		expectedProducerConf.setChunkingEnabled(true);
+		expectedProducerConf.setCryptoKeyReader(cryptoKeyReader);
+		expectedProducerConf.setEncryptionKeys(Collections.singleton("my-key"));
+		expectedProducerConf.setCompressionType(CompressionType.LZ4);
+		expectedProducerConf.setInitialSequenceId(8L);
+		expectedProducerConf.setAutoUpdatePartitions(true);
+		expectedProducerConf.setAutoUpdatePartitionsIntervalSeconds(9);
+		expectedProducerConf.setMultiSchema(true);
+		expectedProducerConf.setAccessMode(ProducerAccessMode.Exclusive);
+		expectedProducerConf.setLazyStartPartitionedProducers(true);
+
+		SortedMap<String, String> properties = new TreeMap<>();
+		properties.put("my-key", "my-value");
+
+		expectedProducerConf.setProperties(properties);
+
+		CompletableFuture<String> failedProducer = new CompletableFuture<>();
+		failedProducer.completeExceptionally(new RuntimeException("didn't match expected producer conf"));
+		doReturn(failedProducer).when(pulsarClient).createProducerAsync(any(), eq(Schema.STRING), isNull());
+		doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient)
+				.createProducerAsync(eq(expectedProducerConf), eq(Schema.STRING), isNull());
+
+		ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+				.messageSender(Schema.STRING).topic("my-topic").producerName("my-producer")
+				.sendTimeout(Duration.ofSeconds(1)).maxPendingMessages(2).maxPendingMessagesAcrossPartitions(3)
+				.messageRoutingMode(MessageRoutingMode.CustomPartition).hashingScheme(HashingScheme.Murmur3_32Hash)
+				.cryptoFailureAction(ProducerCryptoFailureAction.SEND).messageRouter(messageRouter)
+				.batchingMaxPublishDelay(Duration.ofSeconds(4)).roundRobinRouterBatchingPartitionSwitchFrequency(5)
+				.batchingMaxMessages(6).batchingMaxBytes(7).batchingEnabled(false).batcherBuilder(batcherBuilder)
+				.chunkingEnabled(true).cryptoKeyReader(cryptoKeyReader).encryptionKeys(Collections.singleton("my-key"))
+				.compressionType(CompressionType.LZ4).initialSequenceId(8).autoUpdatePartitions(true)
+				.autoUpdatePartitionsInterval(Duration.ofSeconds(9)).multiSchema(true)
+				.accessMode(ProducerAccessMode.Exclusive).lazyStartPartitionedProducers(true)
+				.property("my-key", "my-value").clone().build();
+
+		MessageSpec<String> messageSpec = spy(MessageSpec.of("test"));
+		MessageId messageId1 = reactiveSender.sendOne(messageSpec).block(Duration.ofSeconds(5));
+
+		verify(pulsarClient).createProducerAsync(any(), any(), isNull());
+		verify((InternalMessageSpec<String>) messageSpec).configure(typedMessageBuilder);
+		assertThat(messageId1).isEqualTo(MessageId.earliest);
+	}
+
+	@Test
+	void sendMany() throws Exception {
+		PulsarClientImpl pulsarClient = spy(
+				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
+
+		ProducerBase<String> producer = mock(ProducerBase.class);
+		doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+		TypedMessageBuilderImpl<String> typedMessageBuilder1 = spy(
+				new TypedMessageBuilderImpl<>(producer, Schema.STRING));
+		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder1).sendAsync();
+		TypedMessageBuilderImpl<String> typedMessageBuilder2 = spy(
+				new TypedMessageBuilderImpl<>(producer, Schema.STRING));
+		doReturn(CompletableFuture.completedFuture(MessageId.latest)).when(typedMessageBuilder2).sendAsync();
+
+		doReturn(typedMessageBuilder1, typedMessageBuilder2).when(producer).newMessage();
+		doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
+				eq(Schema.STRING), isNull());
+
+		ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+				.messageSender(Schema.STRING).topic("my-topic").build();
+
+		Flux<MessageSpec<String>> messageSpecs = Flux.just(MessageSpec.of("test1"), MessageSpec.of("test2"));
+		StepVerifier.create(reactiveSender.sendMany(messageSpecs)).expectNext(MessageId.earliest)
+				.expectNext(MessageId.latest).verifyComplete();
+
+		verify(pulsarClient).createProducerAsync(any(), any(), isNull());
+		InOrder inOrder = Mockito.inOrder(typedMessageBuilder1, typedMessageBuilder2);
+		inOrder.verify(typedMessageBuilder1).value("test1");
+		inOrder.verify(typedMessageBuilder1).sendAsync();
+		inOrder.verify(typedMessageBuilder2).value("test2");
+		inOrder.verify(typedMessageBuilder2).sendAsync();
+	}
+
+}