You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/18 09:49:09 UTC

[pulsar] branch master updated: [Pulsar-Client] Extends ConsumerBuilder Validations (#3200)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bb459c  [Pulsar-Client] Extends ConsumerBuilder Validations (#3200)
6bb459c is described below

commit 6bb459ce42d77c7a47bcffa3f9c61a56c29fdb76
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Tue Dec 18 09:49:04 2018 +0000

    [Pulsar-Client] Extends ConsumerBuilder Validations (#3200)
    
    ### Motivation
    This PR aims to extend validations on `ConsumerBuilder` as Public API.
    
    ### Modifications
    1- `ConsumerBuilder` needs to be robust for **null** and **blank** values as Public API.
    2- New UT coverage is added.
---
 .../client/api/SimpleProducerConsumerTest.java     |   8 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  36 +++-
 .../client/impl/ConsumerBuilderImplTest.java       | 223 +++++++++++++++++++++
 3 files changed, 253 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index d38ca35..ae70fce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -489,16 +489,16 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic7").subscriptionName(null)
                     .subscribe();
             Assert.fail("Should fail");
-        } catch (PulsarClientException e) {
-            assertEquals(e.getClass(), InvalidConfigurationException.class);
+        } catch (PulsarClientException | IllegalArgumentException e) {
+            assertEquals(e.getClass(), IllegalArgumentException.class);
         }
 
         try {
             pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic7").subscriptionName("")
                     .subscribe();
             Assert.fail("Should fail");
-        } catch (PulsarClientException e) {
-            Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
+        } catch (PulsarClientException | IllegalArgumentException e) {
+            Assert.assertTrue(e instanceof IllegalArgumentException);
         }
 
         try {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index b83804b..1060460 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -119,14 +119,20 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
 
     @Override
     public ConsumerBuilder<T> topic(String... topicNames) {
-        checkArgument(topicNames.length > 0, "Passed in topicNames should not be empty.");
+        checkArgument(topicNames != null && topicNames.length > 0,
+                "Passed in topicNames should not be null or empty.");
+        Arrays.stream(topicNames).forEach(topicName ->
+                checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
         conf.getTopicNames().addAll(Lists.newArrayList(topicNames));
         return this;
     }
 
     @Override
     public ConsumerBuilder<T> topics(List<String> topicNames) {
-        checkArgument(topicNames != null && !topicNames.isEmpty(), "Passed in topicNames list should not be empty.");
+        checkArgument(topicNames != null && !topicNames.isEmpty(),
+                "Passed in topicNames list should not be null or empty.");
+        topicNames.stream().forEach(topicName ->
+                checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
         conf.getTopicNames().addAll(topicNames);
         return this;
     }
@@ -147,6 +153,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
 
     @Override
     public ConsumerBuilder<T> subscriptionName(String subscriptionName) {
+        checkArgument(StringUtils.isNotBlank(subscriptionName), "subscriptionName cannot be blank");
         conf.setSubscriptionName(subscriptionName);
         return this;
     }
@@ -172,39 +179,40 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     }
 
     @Override
-    public ConsumerBuilder<T> consumerEventListener(ConsumerEventListener consumerEventListener) {
+    public ConsumerBuilder<T> consumerEventListener(@NonNull ConsumerEventListener consumerEventListener) {
         conf.setConsumerEventListener(consumerEventListener);
         return this;
     }
 
     @Override
-    public ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+    public ConsumerBuilder<T> cryptoKeyReader(@NonNull CryptoKeyReader cryptoKeyReader) {
         conf.setCryptoKeyReader(cryptoKeyReader);
         return this;
     }
 
     @Override
-    public ConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action) {
+    public ConsumerBuilder<T> cryptoFailureAction(@NonNull ConsumerCryptoFailureAction action) {
         conf.setCryptoFailureAction(action);
         return this;
     }
 
     @Override
     public ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize) {
-        checkArgument(receiverQueueSize >= 0);
+        checkArgument(receiverQueueSize >= 0, "receiverQueueSize needs to be >= 0");
         conf.setReceiverQueueSize(receiverQueueSize);
         return this;
     }
 
     @Override
     public ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit) {
-        checkArgument(delay >= 0);
+        checkArgument(delay >= 0, "acknowledgmentGroupTime needs to be >= 0");
         conf.setAcknowledgementsGroupTimeMicros(unit.toMicros(delay));
         return this;
     }
 
     @Override
     public ConsumerBuilder<T> consumerName(String consumerName) {
+        checkArgument(StringUtils.isNotBlank(consumerName), "consumerName cannot be blank");
         conf.setConsumerName(consumerName);
         return this;
     }
@@ -217,12 +225,19 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
 
     @Override
     public ConsumerBuilder<T> property(String key, String value) {
+        checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
+                "property key/value cannot be blank");
         conf.getProperties().put(key, value);
         return this;
     }
 
     @Override
-    public ConsumerBuilder<T> properties(Map<String, String> properties) {
+    public ConsumerBuilder<T> properties(@NonNull Map<String, String> properties) {
+        checkArgument(!properties.isEmpty(), "properties cannot be empty");
+        properties.entrySet().forEach(entry ->
+                checkArgument(
+                        StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue()),
+                        "properties' key/value cannot be blank"));
         conf.getProperties().putAll(properties);
         return this;
     }
@@ -246,13 +261,14 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     }
 
 	@Override
-	public ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
+	public ConsumerBuilder<T> subscriptionInitialPosition(@NonNull SubscriptionInitialPosition
+                                                                      subscriptionInitialPosition) {
         conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
 		return this;
 	}
 
     @Override
-    public ConsumerBuilder<T> subscriptionTopicsMode(Mode mode) {
+    public ConsumerBuilder<T> subscriptionTopicsMode(@NonNull Mode mode) {
         conf.setSubscriptionTopicsMode(mode);
         return this;
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
new file mode 100644
index 0000000..daa313c
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * Unit tests of {@link ConsumerBuilderImpl}.
+ */
+public class ConsumerBuilderImplTest {
+
+    private static final String TOPIC_NAME = "testTopicName";
+    private ConsumerBuilderImpl consumerBuilderImpl;
+
+    @BeforeTest
+    public void setup() {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConsumerConfigurationData consumerConfigurationData = mock(ConsumerConfigurationData.class);
+        when(consumerConfigurationData.getTopicsPattern()).thenReturn(Pattern.compile("\\w+"));
+        when(consumerConfigurationData.getSubscriptionName()).thenReturn("testSubscriptionName");
+        consumerBuilderImpl = new ConsumerBuilderImpl(client, consumerConfigurationData, Schema.BYTES);
+    }
+
+    @Test
+    public void testConsumerBuilderImpl() throws PulsarClientException {
+        Consumer consumer = mock(Consumer.class);
+        when(consumerBuilderImpl.subscribeAsync())
+                .thenReturn(CompletableFuture.completedFuture(consumer));
+        assertNotNull(consumerBuilderImpl.topic(TOPIC_NAME).subscribe());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenTopicNamesVarargsIsNull() {
+        consumerBuilderImpl.topic(null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenTopicNamesVarargsHasNullTopic() {
+        consumerBuilderImpl.topic("my-topic", null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenTopicNamesVarargsHasBlankTopic() {
+        consumerBuilderImpl.topic("my-topic", "  ");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenTopicNamesIsNull() {
+        consumerBuilderImpl.topics(null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenTopicNamesIsEmpty() {
+        consumerBuilderImpl.topics(Arrays.asList());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenTopicNamesHasBlankTopic() {
+        List<String> topicNames = Arrays.asList("my-topic", " ");
+        consumerBuilderImpl.topics(topicNames);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenTopicNamesHasNullTopic() {
+        List<String> topicNames = Arrays.asList("my-topic", null);
+        consumerBuilderImpl.topics(topicNames);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenSubscriptionNameIsNull() {
+        consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName(null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenSubscriptionNameIsBlank() {
+        consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName(" ");
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testConsumerBuilderImplWhenConsumerEventListenerIsNull() {
+        consumerBuilderImpl.topic(TOPIC_NAME)
+                .subscriptionName("subscriptionName")
+                .consumerEventListener(null);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testConsumerBuilderImplWhenCryptoKeyReaderIsNull() {
+        consumerBuilderImpl.topic(TOPIC_NAME)
+                .subscriptionName("subscriptionName")
+                .cryptoKeyReader(null);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testConsumerBuilderImplWhenCryptoFailureActionIsNull() {
+        consumerBuilderImpl.topic(TOPIC_NAME)
+                .subscriptionName("subscriptionName")
+                .cryptoFailureAction(null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenConsumerNameIsNull() {
+        consumerBuilderImpl.topic(TOPIC_NAME).consumerName(null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenConsumerNameIsBlank() {
+        consumerBuilderImpl.topic(TOPIC_NAME).consumerName(" ");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPropertyKeyIsNull() {
+        consumerBuilderImpl.topic(TOPIC_NAME).property(null, "Test-Value");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPropertyKeyIsBlank() {
+        consumerBuilderImpl.topic(TOPIC_NAME).property("   ", "Test-Value");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPropertyValueIsNull() {
+        consumerBuilderImpl.topic(TOPIC_NAME).property("Test-Key", null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPropertyValueIsBlank() {
+        consumerBuilderImpl.topic(TOPIC_NAME).property("Test-Key", "   ");
+    }
+
+    @Test
+    public void testConsumerBuilderImplWhenPropertiesAreCorrect() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("Test-Key", "Test-Value");
+        properties.put("Test-Key2", "Test-Value2");
+
+        consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPropertiesKeyIsNull() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(null, "Test-Value");
+
+        consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPropertiesKeyIsBlank() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("  ", "Test-Value");
+
+        consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPropertiesValueIsNull() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("Test-Key", null);
+
+        consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPropertiesValueIsBlank() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("Test-Key", "   ");
+
+        consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPropertiesIsEmpty() {
+        Map<String, String> properties = new HashMap<>();
+
+        consumerBuilderImpl.topic(TOPIC_NAME).properties(properties);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testConsumerBuilderImplWhenPropertiesIsNull() {
+        consumerBuilderImpl.topic(TOPIC_NAME).properties(null);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testConsumerBuilderImplWhenSubscriptionInitialPositionIsNull() {
+        consumerBuilderImpl.topic(TOPIC_NAME).subscriptionInitialPosition(null);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testConsumerBuilderImplWhenSubscriptionTopicsModeIsNull() {
+        consumerBuilderImpl.topic(TOPIC_NAME).subscriptionTopicsMode(null);
+    }
+
+}