You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/18 09:49:09 UTC
[pulsar] branch master updated: [Pulsar-Client] Extends
ConsumerBuilder Validations (#3200)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6bb459c [Pulsar-Client] Extends ConsumerBuilder Validations (#3200)
6bb459c is described below
commit 6bb459ce42d77c7a47bcffa3f9c61a56c29fdb76
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Tue Dec 18 09:49:04 2018 +0000
[Pulsar-Client] Extends ConsumerBuilder Validations (#3200)
### Motivation
This PR aims to extend validations on `ConsumerBuilder` as Public API.
### Modifications
1- `ConsumerBuilder` needs to be robust for **null** and **blank** values as Public API.
2- New UT coverage is added.
---
.../client/api/SimpleProducerConsumerTest.java | 8 +-
.../pulsar/client/impl/ConsumerBuilderImpl.java | 36 +++-
.../client/impl/ConsumerBuilderImplTest.java | 223 +++++++++++++++++++++
3 files changed, 253 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index d38ca35..ae70fce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -489,16 +489,16 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic7").subscriptionName(null)
.subscribe();
Assert.fail("Should fail");
- } catch (PulsarClientException e) {
- assertEquals(e.getClass(), InvalidConfigurationException.class);
+ } catch (PulsarClientException | IllegalArgumentException e) {
+ assertEquals(e.getClass(), IllegalArgumentException.class);
}
try {
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic7").subscriptionName("")
.subscribe();
Assert.fail("Should fail");
- } catch (PulsarClientException e) {
- Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
+ } catch (PulsarClientException | IllegalArgumentException e) {
+ Assert.assertTrue(e instanceof IllegalArgumentException);
}
try {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index b83804b..1060460 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -119,14 +119,20 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
@Override
public ConsumerBuilder<T> topic(String... topicNames) {
- checkArgument(topicNames.length > 0, "Passed in topicNames should not be empty.");
+ checkArgument(topicNames != null && topicNames.length > 0,
+ "Passed in topicNames should not be null or empty.");
+ Arrays.stream(topicNames).forEach(topicName ->
+ checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
conf.getTopicNames().addAll(Lists.newArrayList(topicNames));
return this;
}
@Override
public ConsumerBuilder<T> topics(List<String> topicNames) {
- checkArgument(topicNames != null && !topicNames.isEmpty(), "Passed in topicNames list should not be empty.");
+ checkArgument(topicNames != null && !topicNames.isEmpty(),
+ "Passed in topicNames list should not be null or empty.");
+ topicNames.stream().forEach(topicName ->
+ checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
conf.getTopicNames().addAll(topicNames);
return this;
}
@@ -147,6 +153,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
@Override
public ConsumerBuilder<T> subscriptionName(String subscriptionName) {
+ checkArgument(StringUtils.isNotBlank(subscriptionName), "subscriptionName cannot be blank");
conf.setSubscriptionName(subscriptionName);
return this;
}
@@ -172,39 +179,40 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
}
@Override
- public ConsumerBuilder<T> consumerEventListener(ConsumerEventListener consumerEventListener) {
+ public ConsumerBuilder<T> consumerEventListener(@NonNull ConsumerEventListener consumerEventListener) {
conf.setConsumerEventListener(consumerEventListener);
return this;
}
@Override
- public ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+ public ConsumerBuilder<T> cryptoKeyReader(@NonNull CryptoKeyReader cryptoKeyReader) {
conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}
@Override
- public ConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action) {
+ public ConsumerBuilder<T> cryptoFailureAction(@NonNull ConsumerCryptoFailureAction action) {
conf.setCryptoFailureAction(action);
return this;
}
@Override
public ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize) {
- checkArgument(receiverQueueSize >= 0);
+ checkArgument(receiverQueueSize >= 0, "receiverQueueSize needs to be >= 0");
conf.setReceiverQueueSize(receiverQueueSize);
return this;
}
@Override
public ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit) {
- checkArgument(delay >= 0);
+ checkArgument(delay >= 0, "acknowledgmentGroupTime needs to be >= 0");
conf.setAcknowledgementsGroupTimeMicros(unit.toMicros(delay));
return this;
}
@Override
public ConsumerBuilder<T> consumerName(String consumerName) {
+ checkArgument(StringUtils.isNotBlank(consumerName), "consumerName cannot be blank");
conf.setConsumerName(consumerName);
return this;
}
@@ -217,12 +225,19 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
@Override
public ConsumerBuilder<T> property(String key, String value) {
+ checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
+ "property key/value cannot be blank");
conf.getProperties().put(key, value);
return this;
}
@Override
- public ConsumerBuilder<T> properties(Map<String, String> properties) {
+ public ConsumerBuilder<T> properties(@NonNull Map<String, String> properties) {
+ checkArgument(!properties.isEmpty(), "properties cannot be empty");
+ properties.entrySet().forEach(entry ->
+ checkArgument(
+ StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue()),
+ "properties' key/value cannot be blank"));
conf.getProperties().putAll(properties);
return this;
}
@@ -246,13 +261,14 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
}
@Override
- public ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
+ public ConsumerBuilder<T> subscriptionInitialPosition(@NonNull SubscriptionInitialPosition
+ subscriptionInitialPosition) {
conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
return this;
}
@Override
- public ConsumerBuilder<T> subscriptionTopicsMode(Mode mode) {
+ public ConsumerBuilder<T> subscriptionTopicsMode(@NonNull Mode mode) {
conf.setSubscriptionTopicsMode(mode);
return this;
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
new file mode 100644
index 0000000..daa313c
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * Unit tests of {@link ConsumerBuilderImpl}.
+ */
+public class ConsumerBuilderImplTest {
+
+ private static final String TOPIC_NAME = "testTopicName";
+ private ConsumerBuilderImpl consumerBuilderImpl;
+
+ @BeforeTest
+ public void setup() {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConsumerConfigurationData consumerConfigurationData = mock(ConsumerConfigurationData.class);
+ when(consumerConfigurationData.getTopicsPattern()).thenReturn(Pattern.compile("\\w+"));
+ when(consumerConfigurationData.getSubscriptionName()).thenReturn("testSubscriptionName");
+ consumerBuilderImpl = new ConsumerBuilderImpl(client, consumerConfigurationData, Schema.BYTES);
+ }
+
+ @Test
+ public void testConsumerBuilderImpl() throws PulsarClientException {
+ Consumer consumer = mock(Consumer.class);
+ when(consumerBuilderImpl.subscribeAsync())
+ .thenReturn(CompletableFuture.completedFuture(consumer));
+ assertNotNull(consumerBuilderImpl.topic(TOPIC_NAME).subscribe());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenTopicNamesVarargsIsNull() {
+ consumerBuilderImpl.topic(null);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenTopicNamesVarargsHasNullTopic() {
+ consumerBuilderImpl.topic("my-topic", null);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenTopicNamesVarargsHasBlankTopic() {
+ consumerBuilderImpl.topic("my-topic", " ");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenTopicNamesIsNull() {
+ consumerBuilderImpl.topics(null);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenTopicNamesIsEmpty() {
+ consumerBuilderImpl.topics(Arrays.asList());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenTopicNamesHasBlankTopic() {
+ List<String> topicNames = Arrays.asList("my-topic", " ");
+ consumerBuilderImpl.topics(topicNames);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenTopicNamesHasNullTopic() {
+ List<String> topicNames = Arrays.asList("my-topic", null);
+ consumerBuilderImpl.topics(topicNames);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenSubscriptionNameIsNull() {
+ consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName(null);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenSubscriptionNameIsBlank() {
+ consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName(" ");
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testConsumerBuilderImplWhenConsumerEventListenerIsNull() {
+ consumerBuilderImpl.topic(TOPIC_NAME)
+ .subscriptionName("subscriptionName")
+ .consumerEventListener(null);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testConsumerBuilderImplWhenCryptoKeyReaderIsNull() {
+ consumerBuilderImpl.topic(TOPIC_NAME)
+ .subscriptionName("subscriptionName")
+ .cryptoKeyReader(null);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testConsumerBuilderImplWhenCryptoFailureActionIsNull() {
+ consumerBuilderImpl.topic(TOPIC_NAME)
+ .subscriptionName("subscriptionName")
+ .cryptoFailureAction(null);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenConsumerNameIsNull() {
+ consumerBuilderImpl.topic(TOPIC_NAME).consumerName(null);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenConsumerNameIsBlank() {
+ consumerBuilderImpl.topic(TOPIC_NAME).consumerName(" ");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenPropertyKeyIsNull() {
+ consumerBuilderImpl.topic(TOPIC_NAME).property(null, "Test-Value");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenPropertyKeyIsBlank() {
+ consumerBuilderImpl.topic(TOPIC_NAME).property(" ", "Test-Value");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenPropertyValueIsNull() {
+ consumerBuilderImpl.topic(TOPIC_NAME).property("Test-Key", null);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenPropertyValueIsBlank() {
+ consumerBuilderImpl.topic(TOPIC_NAME).property("Test-Key", " ");
+ }
+
+ @Test
+ public void testConsumerBuilderImplWhenPropertiesAreCorrect() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key", "Test-Value");
+ properties.put("Test-Key2", "Test-Value2");
+
+ consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenPropertiesKeyIsNull() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(null, "Test-Value");
+
+ consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenPropertiesKeyIsBlank() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(" ", "Test-Value");
+
+ consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenPropertiesValueIsNull() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key", null);
+
+ consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenPropertiesValueIsBlank() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key", " ");
+
+ consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConsumerBuilderImplWhenPropertiesIsEmpty() {
+ Map<String, String> properties = new HashMap<>();
+
+ consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testConsumerBuilderImplWhenPropertiesIsNull() {
+ consumerBuilderImpl.topic(TOPIC_NAME).properties(null);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testConsumerBuilderImplWhenSubscriptionInitialPositionIsNull() {
+ consumerBuilderImpl.topic(TOPIC_NAME).subscriptionInitialPosition(null);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testConsumerBuilderImplWhenSubscriptionTopicsModeIsNull() {
+ consumerBuilderImpl.topic(TOPIC_NAME).subscriptionTopicsMode(null);
+ }
+
+}