You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/16 01:34:09 UTC
[pulsar] branch master updated: [Pulsar-Client] Extends
ProducerBuilder Validations (#3193)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 60df0a9 [Pulsar-Client] Extends ProducerBuilder Validations (#3193)
60df0a9 is described below
commit 60df0a989dc541ee05b1484a859d72ba7c0c499b
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Sun Dec 16 01:34:04 2018 +0000
[Pulsar-Client] Extends ProducerBuilder Validations (#3193)
### Motivation
This PR aims to extend validations on `ProducerBuilder`.
### Modifications
1- `ProducerBuilder` needs to be robust for null and blank values as Public API.
(e.g: When `topicName` is passed as blank, it comes as `persistent://public/default/ `)
2- UT coverage is added.
---
.../pulsar/client/impl/ProducerBuilderImpl.java | 26 +--
.../client/impl/ProducerBuilderImplTest.java | 178 +++++++++++++++++++++
2 files changed, 195 insertions(+), 9 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 19df2e9..92526f5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
@@ -45,6 +46,8 @@ import org.apache.pulsar.common.util.FutureUtil;
import lombok.NonNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
private final PulsarClientImpl client;
@@ -121,30 +124,28 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
@Override
public ProducerBuilder<T> topic(String topicName) {
+ checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank");
conf.setTopicName(topicName);
return this;
}
@Override
- public ProducerBuilder<T> producerName(@NonNull String producerName) {
+ public ProducerBuilder<T> producerName(String producerName) {
+ checkArgument(StringUtils.isNotBlank(producerName), "producerName cannot be blank");
conf.setProducerName(producerName);
return this;
}
@Override
public ProducerBuilder<T> sendTimeout(int sendTimeout, @NonNull TimeUnit unit) {
- if (sendTimeout < 0) {
- throw new IllegalArgumentException("sendTimeout needs to be >= 0");
- }
+ checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0");
conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
return this;
}
@Override
public ProducerBuilder<T> maxPendingMessages(int maxPendingMessages) {
- if (maxPendingMessages <= 0) {
- throw new IllegalArgumentException("maxPendingMessages needs to be > 0");
- }
+ checkArgument(maxPendingMessages > 0, "maxPendingMessages needs to be > 0");
conf.setMaxPendingMessages(maxPendingMessages);
return this;
}
@@ -198,7 +199,8 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
}
@Override
- public ProducerBuilder<T> addEncryptionKey(@NonNull String key) {
+ public ProducerBuilder<T> addEncryptionKey(String key) {
+ checkArgument(StringUtils.isNotBlank(key), "Encryption key cannot be blank");
conf.getEncryptionKeys().add(key);
return this;
}
@@ -228,13 +230,19 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
}
@Override
- public ProducerBuilder<T> property(@NonNull String key, @NonNull String value) {
+ public ProducerBuilder<T> property(String key, String value) {
+ checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
+ "property key/value cannot be blank");
conf.getProperties().put(key, value);
return this;
}
@Override
public ProducerBuilder<T> properties(@NonNull Map<String, String> properties) {
+ checkArgument(!properties.isEmpty(), "properties cannot be empty");
+ properties.entrySet().forEach(entry ->
+ checkArgument(StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue()),
+ "properties' key/value cannot be blank"));
conf.getProperties().putAll(properties);
return this;
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index e436e96..d6877c1 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -24,7 +24,10 @@ import org.mockito.Matchers;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -53,6 +56,23 @@ public class ProducerBuilderImplTest {
}
@Test
+ public void testProducerBuilderImpl() throws PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key2", "Test-Value2");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName("Test-Producer")
+ .maxPendingMessages(2)
+ .addEncryptionKey("Test-EncryptionKey")
+ .property("Test-Key", "Test-Value")
+ .properties(properties)
+ .create();
+
+ assertNotNull(producer);
+ }
+
+ @Test
public void testProducerBuilderImplWhenMessageRoutingModeAndMessageRouterAreNotSet() throws PulsarClientException {
producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
@@ -126,6 +146,164 @@ public class ProducerBuilderImplTest {
.create();
}
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenTopicNameIsNull() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenTopicNameIsBlank() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(" ")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenProducerNameIsNull() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenProducerNameIsBlank() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName(" ")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenSendTimeoutIsNegative() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName("Test-Producer")
+ .sendTimeout(-1, TimeUnit.MILLISECONDS)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenMaxPendingMessagesIsNegative() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .producerName("Test-Producer")
+ .maxPendingMessages(-1)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenEncryptionKeyIsNull() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .addEncryptionKey(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenEncryptionKeyIsBlank() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .addEncryptionKey(" ")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyKeyIsNull() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property(null, "Test-Value")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyKeyIsBlank() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property(" ", "Test-Value")
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyValueIsNull() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property("Test-Key", null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertyValueIsBlank() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .property("Test-Key", " ")
+ .create();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testProducerBuilderImplWhenPropertiesIsNull() throws PulsarClientException {
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(null)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesKeyIsNull() throws PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(null, "Test-Value");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesKeyIsBlank() throws PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(" ", "Test-Value");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesValueIsNull() throws PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key", null);
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesValueIsBlank() throws PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("Test-Key", " ");
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenPropertiesIsEmpty() throws PulsarClientException {
+ Map<String, String> properties = new HashMap<>();
+
+ producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ producerBuilderImpl.topic(TOPIC_NAME)
+ .properties(properties)
+ .create();
+ }
+
private class CustomMessageRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {