You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/12/01 07:52:44 UTC

[pulsar-client-reactive] branch main updated: Add test for ReactiveMessageConsumerBuilder (#60)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f005b8  Add test for ReactiveMessageConsumerBuilder (#60)
8f005b8 is described below

commit 8f005b846273fb711da282adebe48785f8e80c9c
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Thu Dec 1 08:52:40 2022 +0100

    Add test for ReactiveMessageConsumerBuilder (#60)
---
 pulsar-client-reactive-api/build.gradle            |   6 +
 .../api/MutableReactiveMessageConsumerSpec.java    |   6 +
 .../api/ReactiveMessageConsumerBuilderTest.java    | 219 +++++++++++++++++++++
 3 files changed, 231 insertions(+)

diff --git a/pulsar-client-reactive-api/build.gradle b/pulsar-client-reactive-api/build.gradle
index f22b224..8dccdcd 100644
--- a/pulsar-client-reactive-api/build.gradle
+++ b/pulsar-client-reactive-api/build.gradle
@@ -9,6 +9,12 @@ dependencies {
 	api libs.pulsar.client.api
 	api libs.slf4j.api
 	implementation libs.jctools.core
+
+	testImplementation libs.junit.jupiter
+	testImplementation libs.junit.jupiter.params
+	testImplementation libs.reactor.test
+	testImplementation libs.assertj.core
+	testImplementation libs.bundles.log4j
 }
 
 description = "Reactive Java client for Apache Pulsar"
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
index 51e3c91..2ad8768 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
@@ -696,6 +696,12 @@ public class MutableReactiveMessageConsumerSpec implements ReactiveMessageConsum
 		if (consumerSpec.getAcknowledgementsGroupTime() != null) {
 			setAcknowledgementsGroupTime(consumerSpec.getAcknowledgementsGroupTime());
 		}
+		if (consumerSpec.getAcknowledgeAsynchronously() != null) {
+			setAcknowledgeAsynchronously(consumerSpec.getAcknowledgeAsynchronously());
+		}
+		if (consumerSpec.getAcknowledgeScheduler() != null) {
+			setAcknowledgeScheduler(consumerSpec.getAcknowledgeScheduler());
+		}
 		if (consumerSpec.getNegativeAckRedeliveryDelay() != null) {
 			setNegativeAckRedeliveryDelay(consumerSpec.getNegativeAckRedeliveryDelay());
 		}
diff --git a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilderTest.java b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilderTest.java
new file mode 100644
index 0000000..d5220db
--- /dev/null
+++ b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilderTest.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.api;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.junit.jupiter.api.Test;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link ReactiveMessageConsumerBuilder},
+ * {@link MutableReactiveMessageConsumerSpec} and
+ * {@link ImmutableReactiveMessageConsumerSpec}.
+ *
+ * @author Christophe Bornet
+ */
+class ReactiveMessageConsumerBuilderTest {
+
+	private static final KeySharedPolicy keySharedPolicy = KeySharedPolicy.autoSplitHashRange();
+
+	private static final CryptoKeyReader cryptoKeyReader = new TestCryptoKeyReader();
+
+	private static final DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder().build();
+
+	private static final Scheduler scheduler = Schedulers.newSingle("my-sched");
+
+	@Test
+	void emptyBuilder() {
+		MutableReactiveMessageConsumerSpec spec = new TestReactiveMessageConsumerBuilder().getMutableSpec();
+		assertThat(spec).hasAllNullFieldsOrPropertiesExcept("topicNames");
+		assertThat(spec.getTopicNames()).isEmpty();
+	}
+
+	@Test
+	void getMutableSpec() {
+		assertConsumerSpecWithAllValues(createConsumerSpec());
+	}
+
+	@Test
+	void mutableSpecFromReactiveMessageConsumerSpec() {
+		ReactiveMessageConsumerSpec spec = new MutableReactiveMessageConsumerSpec(createConsumerSpec());
+		assertConsumerSpecWithAllValues(spec);
+	}
+
+	@Test
+	void immutableSpecFromReactiveMessageConsumerSpec() {
+		ReactiveMessageConsumerSpec spec = new ImmutableReactiveMessageConsumerSpec(createConsumerSpec());
+		assertConsumerSpecWithAllValues(spec);
+	}
+
+	@Test
+	void toImmutableSpec() {
+		ReactiveMessageConsumerSpec spec = createConsumerBuilder().toImmutableSpec();
+		assertConsumerSpecWithAllValues(spec);
+	}
+
+	@Test
+	void applySpec() {
+		ReactiveMessageConsumerSpec spec = new TestReactiveMessageConsumerBuilder().applySpec(createConsumerSpec())
+				.getMutableSpec();
+		assertConsumerSpecWithAllValues(spec);
+	}
+
+	@Test
+	void topics() {
+		ArrayList<String> topics = new ArrayList<>();
+		topics.add("topic-1");
+		ReactiveMessageConsumerSpec spec = new TestReactiveMessageConsumerBuilder().topic("ignored-1")
+				.topic("ignored-2", "ignored-3").topics(topics).topic("topic-2").topic("topic-3", "topic-4")
+				.toImmutableSpec();
+		assertThat(spec.getTopicNames()).containsExactly("topic-1", "topic-2", "topic-3", "topic-4");
+	}
+
+	@Test
+	void properties() {
+		Map<String, String> properties = new HashMap<>();
+		properties.put("key-1", "value-1");
+		ReactiveMessageConsumerSpec spec = new TestReactiveMessageConsumerBuilder()
+				.property("key-ignored", "property-ignored").properties(properties).property("key-2", "value-2")
+				.toImmutableSpec();
+		assertThat(spec.getProperties()).hasSize(2).containsEntry("key-1", "value-1").containsEntry("key-2", "value-2");
+	}
+
+	@Test
+	void subscriptionProperties() {
+		Map<String, String> properties = new HashMap<>();
+		properties.put("key-1", "value-1");
+		ReactiveMessageConsumerSpec spec = new TestReactiveMessageConsumerBuilder()
+				.subscriptionProperty("key-ignored", "property-ignored").subscriptionProperties(properties)
+				.subscriptionProperty("key-2", "value-2").toImmutableSpec();
+		assertThat(spec.getSubscriptionProperties()).hasSize(2).containsEntry("key-1", "value-1").containsEntry("key-2",
+				"value-2");
+	}
+
+	private void assertConsumerSpecWithAllValues(ReactiveMessageConsumerSpec spec) {
+		assertThat(spec.getTopicNames()).containsExactly("my-topic");
+		assertThat(spec.getTopicsPattern().pattern()).isEqualTo("my-topic-*");
+		assertThat(spec.getTopicsPatternSubscriptionMode()).isEqualTo(RegexSubscriptionMode.PersistentOnly);
+		assertThat(spec.getTopicsPatternAutoDiscoveryPeriod()).hasSeconds(1);
+		assertThat(spec.getSubscriptionName()).isEqualTo("my-sub");
+		assertThat(spec.getSubscriptionMode()).isEqualTo(SubscriptionMode.Durable);
+		assertThat(spec.getSubscriptionType()).isEqualTo(SubscriptionType.Exclusive);
+		assertThat(spec.getSubscriptionInitialPosition()).isEqualTo(SubscriptionInitialPosition.Latest);
+		assertThat(spec.getKeySharedPolicy()).isSameAs(keySharedPolicy);
+		assertThat(spec.getReplicateSubscriptionState()).isTrue();
+		assertThat(spec.getSubscriptionProperties()).hasSize(1).containsEntry("my-sub-key", "my-sub-value");
+		assertThat(spec.getConsumerName()).isEqualTo("my-consumer");
+		assertThat(spec.getProperties()).hasSize(1).containsEntry("my-key", "my-value");
+		assertThat(spec.getPriorityLevel()).isEqualTo(2);
+		assertThat(spec.getReadCompacted()).isTrue();
+		assertThat(spec.getBatchIndexAckEnabled()).isTrue();
+		assertThat(spec.getAckTimeout()).hasSeconds(3);
+		assertThat(spec.getAckTimeoutTickTime()).hasSeconds(4);
+		assertThat(spec.getAcknowledgementsGroupTime()).hasSeconds(5);
+		assertThat(spec.getAcknowledgeAsynchronously()).isTrue();
+		assertThat(spec.getAcknowledgeScheduler()).isSameAs(scheduler);
+		assertThat(spec.getNegativeAckRedeliveryDelay()).hasSeconds(6);
+		assertThat(spec.getDeadLetterPolicy()).isSameAs(deadLetterPolicy);
+		assertThat(spec.getRetryLetterTopicEnable()).isTrue();
+		assertThat(spec.getReceiverQueueSize()).isEqualTo(7);
+		assertThat(spec.getMaxTotalReceiverQueueSizeAcrossPartitions()).isEqualTo(8);
+		assertThat(spec.getAutoUpdatePartitions()).isTrue();
+		assertThat(spec.getAutoUpdatePartitionsInterval()).hasSeconds(9);
+		assertThat(spec.getCryptoKeyReader()).isSameAs(cryptoKeyReader);
+		assertThat(spec.getCryptoFailureAction()).isEqualTo(ConsumerCryptoFailureAction.FAIL);
+		assertThat(spec.getMaxPendingChunkedMessage()).isEqualTo(10);
+		assertThat(spec.getAutoAckOldestChunkedMessageOnQueueFull()).isTrue();
+		assertThat(spec.getExpireTimeOfIncompleteChunkedMessage()).hasSeconds(11);
+	}
+
+	private ReactiveMessageConsumerSpec createConsumerSpec() {
+		return createConsumerBuilder().getMutableSpec();
+	}
+
+	private ReactiveMessageConsumerBuilder<String> createConsumerBuilder() {
+		return new TestReactiveMessageConsumerBuilder().topic("my-topic").topicsPattern(Pattern.compile("my-topic-*"))
+				.topicsPatternSubscriptionMode(RegexSubscriptionMode.PersistentOnly)
+				.topicsPatternAutoDiscoveryPeriod(Duration.ofSeconds(1)).subscriptionName("my-sub")
+				.subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Exclusive)
+				.subscriptionInitialPosition(SubscriptionInitialPosition.Latest).keySharedPolicy(keySharedPolicy)
+				.replicateSubscriptionState(true).subscriptionProperty("my-sub-key", "my-sub-value")
+				.consumerName("my-consumer").priorityLevel(2).readCompacted(true).property("my-key", "my-value")
+				.batchIndexAckEnabled(true).ackTimeout(Duration.ofSeconds(3)).ackTimeoutTickTime(Duration.ofSeconds(4))
+				.acknowledgementsGroupTime(Duration.ofSeconds(5)).acknowledgeAsynchronously(true)
+				.acknowledgeScheduler(scheduler).negativeAckRedeliveryDelay(Duration.ofSeconds(6))
+				.deadLetterPolicy(deadLetterPolicy).retryLetterTopicEnable(true).receiverQueueSize(7)
+				.maxTotalReceiverQueueSizeAcrossPartitions(8).autoUpdatePartitions(true)
+				.autoUpdatePartitionsInterval(Duration.ofSeconds(9)).cryptoKeyReader(cryptoKeyReader)
+				.cryptoFailureAction(ConsumerCryptoFailureAction.FAIL).maxPendingChunkedMessage(10)
+				.autoAckOldestChunkedMessageOnQueueFull(true)
+				.expireTimeOfIncompleteChunkedMessage(Duration.ofSeconds(11));
+	}
+
+	static class TestReactiveMessageConsumerBuilder implements ReactiveMessageConsumerBuilder<String> {
+
+		MutableReactiveMessageConsumerSpec consumerSpec = new MutableReactiveMessageConsumerSpec();
+
+		@Override
+		public MutableReactiveMessageConsumerSpec getMutableSpec() {
+			return this.consumerSpec;
+		}
+
+		@Override
+		public ReactiveMessageConsumerBuilder<String> clone() {
+			return null;
+		}
+
+		@Override
+		public ReactiveMessageConsumer<String> build() {
+			return null;
+		}
+
+	}
+
+	static class TestCryptoKeyReader implements CryptoKeyReader {
+
+		@Override
+		public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> metadata) {
+			return null;
+		}
+
+		@Override
+		public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+			return null;
+		}
+
+	}
+
+}