You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/21 23:29:05 UTC
[pulsar] branch master updated: [Pulsar-Client] Add Producer
Builder API Numeric Properties Validations (#4539)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3c8a44e [Pulsar-Client] Add Producer Builder API Numeric Properties Validations (#4539)
3c8a44e is described below
commit 3c8a44e017c792de164547c536c14e277a5a0141
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Mon Jul 22 00:29:01 2019 +0100
[Pulsar-Client] Add Producer Builder API Numeric Properties Validations (#4539)
* [Pulsar-Client] Add Producer Numeric Properties Validation
* Aligned deprecated and new Producer API validations
* Deprecated and new Producer API validations are being aligned
* batchingMaxMessages C++ API is being aligned with Java API
* batchingMaxMessages Java API Validation is being aligned with C++ API
* Review comments are addressed
* Fix broken UTs
---
.../client/api/v1/V1_ProducerConsumerTest.java | 14 +++++----
.../pulsar/client/impl/TopicFromMessageTest.java | 20 ++++++------
.../pulsar/client/api/ProducerConfiguration.java | 10 ++----
.../pulsar/client/impl/ProducerBuilderImpl.java | 7 ++---
.../impl/conf/ProducerConfigurationData.java | 36 +++++++++++++++++++++-
.../client/impl/ProducerBuilderImplTest.java | 28 +++++++++++++++++
6 files changed, 86 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 8fdc5ed..8fd33bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -87,7 +87,9 @@ import org.testng.annotations.Test;
* Basic tests using the deprecated client APIs from Pulsar-1.x
*/
public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
+
private static final Logger log = LoggerFactory.getLogger(V1_ProducerConsumerTest.class);
+ private static final long BATCHING_MAX_PUBLISH_DELAY_THRESHOLD = 1;
@BeforeMethod
@Override
@@ -160,7 +162,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/use/my-ns/my-topic2")
.batchingMaxMessages(5)
- .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
+ .batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
.enableBatching(batchMessageDelayMs != 0)
.create();
@@ -218,7 +220,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/use/my-ns/my-topic3")
.batchingMaxMessages(5)
- .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
+ .batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
.enableBatching(batchMessageDelayMs != 0)
.create();
@@ -254,7 +256,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/use/my-ns/my-topic4")
.batchingMaxMessages(5)
- .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
+ .batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
.enableBatching(batchMessageDelayMs != 0)
.create();
@@ -305,7 +307,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/use/my-ns/my-topic5")
.batchingMaxMessages(5)
- .batchingMaxPublishDelay(2 * batchMessageDelayMs, TimeUnit.MILLISECONDS)
+ .batchingMaxPublishDelay(2 * BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
.enableBatching(batchMessageDelayMs != 0)
.sendTimeout(1, TimeUnit.SECONDS)
.create();
@@ -527,7 +529,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
// publish 100 messages so that the consumers blocked on receive() will now get the messages
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/use/my-ns/my-topic7")
- .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
+ .batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5)
.enableBatching(batchMessageDelayMs != 0)
.create();
@@ -1365,7 +1367,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/use/my-ns/unacked-topic")
.enableBatching(batchMessageDelayMs != 0)
- .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
+ .batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5)
.create();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
index 2242435..e74f52f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
@@ -28,7 +28,9 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TopicFromMessageTest extends ProducerConsumerBase {
- private static final long testTimeout = 90000; // 1.5 min
+
+ private static final long TEST_TIMEOUT = 90000; // 1.5 min
+ private static final int BATCHING_MAX_MESSAGES_THRESHOLD = 2;
@Override
@BeforeMethod
@@ -43,7 +45,7 @@ public class TopicFromMessageTest extends ProducerConsumerBase {
super.internalCleanup();
}
- @Test(timeOut = testTimeout)
+ @Test(timeOut = TEST_TIMEOUT)
public void testSingleTopicConsumerNoBatchShortName() throws Exception {
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("topic1").subscriptionName("sub1").subscribe();
@@ -54,7 +56,7 @@ public class TopicFromMessageTest extends ProducerConsumerBase {
}
}
- @Test(timeOut = testTimeout)
+ @Test(timeOut = TEST_TIMEOUT)
public void testSingleTopicConsumerNoBatchFullName() throws Exception {
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("my-property/my-ns/topic1").subscriptionName("sub1").subscribe();
@@ -65,7 +67,7 @@ public class TopicFromMessageTest extends ProducerConsumerBase {
}
}
- @Test(timeOut = testTimeout)
+ @Test(timeOut = TEST_TIMEOUT)
public void testMultiTopicConsumerNoBatchShortName() throws Exception {
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topics(Lists.newArrayList("topic1", "topic2")).subscriptionName("sub1").subscribe();
@@ -80,26 +82,26 @@ public class TopicFromMessageTest extends ProducerConsumerBase {
}
}
- @Test(timeOut = testTimeout)
+ @Test(timeOut = TEST_TIMEOUT)
public void testSingleTopicConsumerBatchShortName() throws Exception {
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("topic1").subscriptionName("sub1").subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
- .topic("topic1").enableBatching(true).batchingMaxMessages(1).create()) {
+ .topic("topic1").enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create()) {
producer.send("foobar".getBytes());
Assert.assertEquals(consumer.receive().getTopicName(), "persistent://public/default/topic1");
}
}
- @Test(timeOut = testTimeout)
+ @Test(timeOut = TEST_TIMEOUT)
public void testMultiTopicConsumerBatchShortName() throws Exception {
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topics(Lists.newArrayList("topic1", "topic2")).subscriptionName("sub1").subscribe();
Producer<byte[]> producer1 = pulsarClient.newProducer()
- .topic("topic1").enableBatching(true).batchingMaxMessages(1).create();
+ .topic("topic1").enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create();
Producer<byte[]> producer2 = pulsarClient.newProducer()
- .topic("topic2").enableBatching(true).batchingMaxMessages(1).create()) {
+ .topic("topic2").enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create()) {
producer1.send("foobar".getBytes());
producer2.send("foobar".getBytes());
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 1af077b..8ea08bf 100644
--- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -101,8 +101,7 @@ public class ProducerConfiguration implements Serializable {
* the time unit of the {@code sendTimeout}
*/
public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) {
- checkArgument(sendTimeout >= 0);
- conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
+ conf.setSendTimeoutMs(sendTimeout, unit);
return this;
}
@@ -123,7 +122,6 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) {
- checkArgument(maxPendingMessages > 0);
conf.setMaxPendingMessages(maxPendingMessages);
return this;
}
@@ -154,7 +152,6 @@ public class ProducerConfiguration implements Serializable {
* @param maxPendingMessagesAcrossPartitions
*/
public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
- checkArgument(maxPendingMessagesAcrossPartitions >= conf.getMaxPendingMessages());
conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
}
@@ -399,9 +396,7 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
- long delayInMs = timeUnit.toMillis(batchDelay);
- checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
- conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
+ conf.setBatchingMaxPublishDelayMicros(batchDelay, timeUnit);
return this;
}
@@ -425,7 +420,6 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
- checkArgument(batchMessagesMaxMessagesPerBatch > 0);
conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
return this;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 77f4dbe..d8ad226 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -123,21 +123,18 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
@Override
public ProducerBuilder<T> producerName(String producerName) {
- checkArgument(StringUtils.isNotBlank(producerName), "producerName cannot be blank");
conf.setProducerName(producerName);
return this;
}
@Override
public ProducerBuilder<T> sendTimeout(int sendTimeout, @NonNull TimeUnit unit) {
- checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0");
- conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
+ conf.setSendTimeoutMs(sendTimeout, unit);
return this;
}
@Override
public ProducerBuilder<T> maxPendingMessages(int maxPendingMessages) {
- checkArgument(maxPendingMessages > 0, "maxPendingMessages needs to be > 0");
conf.setMaxPendingMessages(maxPendingMessages);
return this;
}
@@ -205,7 +202,7 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
@Override
public ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, @NonNull TimeUnit timeUnit) {
- conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
+ conf.setBatchingMaxPublishDelayMicros(batchDelay, timeUnit);
return this;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 9da91f3..e5318b7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -41,6 +42,8 @@ import com.google.common.collect.Sets;
import lombok.Data;
+import static com.google.common.base.Preconditions.checkArgument;
+
@Data
@NoArgsConstructor
@AllArgsConstructor
@@ -49,7 +52,6 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
private String topicName = null;
-
private String producerName = null;
private long sendTimeoutMs = 30000;
private boolean blockIfQueueFull = false;
@@ -104,4 +106,36 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
throw new RuntimeException("Failed to clone ProducerConfigurationData", e);
}
}
+
+ public void setProducerName(String producerName) {
+ checkArgument(StringUtils.isNotBlank(producerName), "producerName cannot be blank");
+ this.producerName = producerName;
+ }
+
+ public void setMaxPendingMessages(int maxPendingMessages) {
+ checkArgument(maxPendingMessages > 0, "maxPendingMessages needs to be > 0");
+ this.maxPendingMessages = maxPendingMessages;
+ }
+
+ public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+ checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages);
+ this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+ }
+
+ public void setBatchingMaxMessages(int batchingMaxMessages) {
+ checkArgument(batchingMaxMessages > 1, "batchingMaxMessages needs to be > 1");
+ this.batchingMaxMessages = batchingMaxMessages;
+ }
+
+ public void setSendTimeoutMs(int sendTimeout, TimeUnit timeUnit) {
+ checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0");
+ this.sendTimeoutMs = timeUnit.toMillis(sendTimeout);
+ }
+
+ public void setBatchingMaxPublishDelayMicros(long batchDelay, TimeUnit timeUnit) {
+ long delayInMs = timeUnit.toMillis(batchDelay);
+ checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
+ this.batchingMaxPublishDelayMicros = timeUnit.toMicros(batchDelay);
+ }
+
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index 862237a..35bd899 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -304,6 +304,34 @@ public class ProducerBuilderImplTest {
.create();
}
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenBatchingMaxPublishDelayPropertyIsNegative() {
+ producerBuilderImpl.batchingMaxPublishDelay(-1, TimeUnit.MILLISECONDS);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenBatchingMaxMessagesPropertyIsNegative() {
+ producerBuilderImpl.batchingMaxMessages(-1);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenSendTimeoutPropertyIsNegative() {
+ producerBuilderImpl.sendTimeout(-1, TimeUnit.SECONDS);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testProducerBuilderImplWhenMaxPendingMessagesAcrossPartitionsPropertyIsInvalid() {
+ producerBuilderImpl.maxPendingMessagesAcrossPartitions(999);
+ }
+
+ @Test
+ public void testProducerBuilderImplWhenNumericPropertiesAreValid() {
+ producerBuilderImpl.batchingMaxPublishDelay(1, TimeUnit.SECONDS);
+ producerBuilderImpl.batchingMaxMessages(2);
+ producerBuilderImpl.sendTimeout(1, TimeUnit.SECONDS);
+ producerBuilderImpl.maxPendingMessagesAcrossPartitions(1000);
+ }
+
private class CustomMessageRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {