You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/16 01:34:09 UTC

[pulsar] branch master updated: [Pulsar-Client] Extends ProducerBuilder Validations (#3193)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 60df0a9  [Pulsar-Client] Extends ProducerBuilder Validations (#3193)
60df0a9 is described below

commit 60df0a989dc541ee05b1484a859d72ba7c0c499b
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Sun Dec 16 01:34:04 2018 +0000

    [Pulsar-Client] Extends ProducerBuilder Validations (#3193)
    
    ### Motivation
    This PR aims to extend validations on `ProducerBuilder`.
    
    ### Modifications
    1- `ProducerBuilder` needs to be robust for null and blank values as Public API.
          (e.g: When `topicName` is passed as blank, it comes as `persistent://public/default/    `)
    2- UT coverage is added.
---
 .../pulsar/client/impl/ProducerBuilderImpl.java    |  26 +--
 .../client/impl/ProducerBuilderImplTest.java       | 178 +++++++++++++++++++++
 2 files changed, 195 insertions(+), 9 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 19df2e9..92526f5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.HashingScheme;
@@ -45,6 +46,8 @@ import org.apache.pulsar.common.util.FutureUtil;
 
 import lombok.NonNull;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
 
     private final PulsarClientImpl client;
@@ -121,30 +124,28 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
 
     @Override
     public ProducerBuilder<T> topic(String topicName) {
+        checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank");
         conf.setTopicName(topicName);
         return this;
     }
 
     @Override
-    public ProducerBuilder<T> producerName(@NonNull String producerName) {
+    public ProducerBuilder<T> producerName(String producerName) {
+        checkArgument(StringUtils.isNotBlank(producerName), "producerName cannot be blank");
         conf.setProducerName(producerName);
         return this;
     }
 
     @Override
     public ProducerBuilder<T> sendTimeout(int sendTimeout, @NonNull TimeUnit unit) {
-        if (sendTimeout < 0) {
-            throw new IllegalArgumentException("sendTimeout needs to be >= 0");
-        }
+        checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0");
         conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
         return this;
     }
 
     @Override
     public ProducerBuilder<T> maxPendingMessages(int maxPendingMessages) {
-        if (maxPendingMessages <= 0) {
-            throw new IllegalArgumentException("maxPendingMessages needs to be > 0");
-        }
+        checkArgument(maxPendingMessages > 0, "maxPendingMessages needs to be > 0");
         conf.setMaxPendingMessages(maxPendingMessages);
         return this;
     }
@@ -198,7 +199,8 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
     }
 
     @Override
-    public ProducerBuilder<T> addEncryptionKey(@NonNull String key) {
+    public ProducerBuilder<T> addEncryptionKey(String key) {
+        checkArgument(StringUtils.isNotBlank(key), "Encryption key cannot be blank");
         conf.getEncryptionKeys().add(key);
         return this;
     }
@@ -228,13 +230,19 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
     }
 
     @Override
-    public ProducerBuilder<T> property(@NonNull String key, @NonNull String value) {
+    public ProducerBuilder<T> property(String key, String value) {
+        checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
+                "property key/value cannot be blank");
         conf.getProperties().put(key, value);
         return this;
     }
 
     @Override
     public ProducerBuilder<T> properties(@NonNull Map<String, String> properties) {
+        checkArgument(!properties.isEmpty(), "properties cannot be empty");
+        properties.entrySet().forEach(entry ->
+            checkArgument(StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue()),
+                    "properties' key/value cannot be blank"));
         conf.getProperties().putAll(properties);
         return this;
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index e436e96..d6877c1 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -24,7 +24,10 @@ import org.mockito.Matchers;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -53,6 +56,23 @@ public class ProducerBuilderImplTest {
     }
 
     @Test
+    public void testProducerBuilderImpl() throws PulsarClientException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("Test-Key2", "Test-Value2");
+
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
+                .producerName("Test-Producer")
+                .maxPendingMessages(2)
+                .addEncryptionKey("Test-EncryptionKey")
+                .property("Test-Key", "Test-Value")
+                .properties(properties)
+                .create();
+
+        assertNotNull(producer);
+    }
+
+    @Test
     public void testProducerBuilderImplWhenMessageRoutingModeAndMessageRouterAreNotSet() throws PulsarClientException {
         producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
         Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
@@ -126,6 +146,164 @@ public class ProducerBuilderImplTest {
                 .create();
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenTopicNameIsNull() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(null)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenTopicNameIsBlank() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic("   ")
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenProducerNameIsNull() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .producerName(null)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenProducerNameIsBlank() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .producerName("   ")
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenSendTimeoutIsNegative() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .producerName("Test-Producer")
+                .sendTimeout(-1, TimeUnit.MILLISECONDS)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenMaxPendingMessagesIsNegative() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .producerName("Test-Producer")
+                .maxPendingMessages(-1)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenEncryptionKeyIsNull() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .addEncryptionKey(null)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenEncryptionKeyIsBlank() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .addEncryptionKey("   ")
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenPropertyKeyIsNull() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .property(null, "Test-Value")
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenPropertyKeyIsBlank() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .property("   ", "Test-Value")
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenPropertyValueIsNull() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .property("Test-Key", null)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenPropertyValueIsBlank() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .property("Test-Key", "   ")
+                .create();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testProducerBuilderImplWhenPropertiesIsNull() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .properties(null)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenPropertiesKeyIsNull() throws PulsarClientException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(null, "Test-Value");
+
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .properties(properties)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenPropertiesKeyIsBlank() throws PulsarClientException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("   ", "Test-Value");
+
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .properties(properties)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenPropertiesValueIsNull() throws PulsarClientException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("Test-Key", null);
+
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .properties(properties)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenPropertiesValueIsBlank() throws PulsarClientException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("Test-Key", "   ");
+
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .properties(properties)
+                .create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenPropertiesIsEmpty() throws PulsarClientException {
+        Map<String, String> properties = new HashMap<>();
+
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .properties(properties)
+                .create();
+    }
+
     private class CustomMessageRouter implements MessageRouter {
         @Override
         public int choosePartition(Message<?> msg, TopicMetadata metadata) {