You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/21 23:29:05 UTC

[pulsar] branch master updated: [Pulsar-Client] Add Producer Builder API Numeric Properties Validations (#4539)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c8a44e  [Pulsar-Client] Add Producer Builder API Numeric Properties Validations (#4539)
3c8a44e is described below

commit 3c8a44e017c792de164547c536c14e277a5a0141
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Mon Jul 22 00:29:01 2019 +0100

    [Pulsar-Client] Add Producer Builder API Numeric Properties Validations (#4539)
    
    * [Pulsar-Client] Add Producer Numeric Properties Validation
    
    * Aligned deprecated and new Producer API validations
    
    * Deprecated and new Producer API validations are being aligned
    
    * batchingMaxMessages C++ API is being aligned with Java API
    
    * batchingMaxMessages Java API Validation is being aligned with C++ API
    
    * Review comments are addressed
    
    * Fix broken UTs
---
 .../client/api/v1/V1_ProducerConsumerTest.java     | 14 +++++----
 .../pulsar/client/impl/TopicFromMessageTest.java   | 20 ++++++------
 .../pulsar/client/api/ProducerConfiguration.java   | 10 ++----
 .../pulsar/client/impl/ProducerBuilderImpl.java    |  7 ++---
 .../impl/conf/ProducerConfigurationData.java       | 36 +++++++++++++++++++++-
 .../client/impl/ProducerBuilderImplTest.java       | 28 +++++++++++++++++
 6 files changed, 86 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 8fdc5ed..8fd33bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -87,7 +87,9 @@ import org.testng.annotations.Test;
  * Basic tests using the deprecated client APIs from Pulsar-1.x
  */
 public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
+
     private static final Logger log = LoggerFactory.getLogger(V1_ProducerConsumerTest.class);
+    private static final long BATCHING_MAX_PUBLISH_DELAY_THRESHOLD = 1;
 
     @BeforeMethod
     @Override
@@ -160,7 +162,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic("persistent://my-property/use/my-ns/my-topic2")
                 .batchingMaxMessages(5)
-                .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
+                .batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
                 .enableBatching(batchMessageDelayMs != 0)
                 .create();
 
@@ -218,7 +220,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic("persistent://my-property/use/my-ns/my-topic3")
                 .batchingMaxMessages(5)
-                .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
+                .batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
                 .enableBatching(batchMessageDelayMs != 0)
                 .create();
 
@@ -254,7 +256,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic("persistent://my-property/use/my-ns/my-topic4")
                 .batchingMaxMessages(5)
-                .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
+                .batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
                 .enableBatching(batchMessageDelayMs != 0)
                 .create();
 
@@ -305,7 +307,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic("persistent://my-property/use/my-ns/my-topic5")
                 .batchingMaxMessages(5)
-                .batchingMaxPublishDelay(2 * batchMessageDelayMs, TimeUnit.MILLISECONDS)
+                .batchingMaxPublishDelay(2 * BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
                 .enableBatching(batchMessageDelayMs != 0)
                 .sendTimeout(1, TimeUnit.SECONDS)
                 .create();
@@ -527,7 +529,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         // publish 100 messages so that the consumers blocked on receive() will now get the messages
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic("persistent://my-property/use/my-ns/my-topic7")
-                .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
+                .batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
                 .batchingMaxMessages(5)
                 .enableBatching(batchMessageDelayMs != 0)
                 .create();
@@ -1365,7 +1367,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             Producer<byte[]> producer = pulsarClient.newProducer()
                     .topic("persistent://my-property/use/my-ns/unacked-topic")
                     .enableBatching(batchMessageDelayMs != 0)
-                    .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
+                    .batchingMaxPublishDelay(BATCHING_MAX_PUBLISH_DELAY_THRESHOLD, TimeUnit.MILLISECONDS)
                     .batchingMaxMessages(5)
                     .create();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
index 2242435..e74f52f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
@@ -28,7 +28,9 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class TopicFromMessageTest extends ProducerConsumerBase {
-    private static final long testTimeout = 90000; // 1.5 min
+
+    private static final long TEST_TIMEOUT = 90000; // 1.5 min
+    private static final int BATCHING_MAX_MESSAGES_THRESHOLD = 2;
 
     @Override
     @BeforeMethod
@@ -43,7 +45,7 @@ public class TopicFromMessageTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test(timeOut = testTimeout)
+    @Test(timeOut = TEST_TIMEOUT)
     public void testSingleTopicConsumerNoBatchShortName() throws Exception {
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("topic1").subscriptionName("sub1").subscribe();
@@ -54,7 +56,7 @@ public class TopicFromMessageTest extends ProducerConsumerBase {
         }
     }
 
-    @Test(timeOut = testTimeout)
+    @Test(timeOut = TEST_TIMEOUT)
     public void testSingleTopicConsumerNoBatchFullName() throws Exception {
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("my-property/my-ns/topic1").subscriptionName("sub1").subscribe();
@@ -65,7 +67,7 @@ public class TopicFromMessageTest extends ProducerConsumerBase {
         }
     }
 
-    @Test(timeOut = testTimeout)
+    @Test(timeOut = TEST_TIMEOUT)
     public void testMultiTopicConsumerNoBatchShortName() throws Exception {
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topics(Lists.newArrayList("topic1", "topic2")).subscriptionName("sub1").subscribe();
@@ -80,26 +82,26 @@ public class TopicFromMessageTest extends ProducerConsumerBase {
         }
     }
 
-    @Test(timeOut = testTimeout)
+    @Test(timeOut = TEST_TIMEOUT)
     public void testSingleTopicConsumerBatchShortName() throws Exception {
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("topic1").subscriptionName("sub1").subscribe();
              Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic("topic1").enableBatching(true).batchingMaxMessages(1).create()) {
+                .topic("topic1").enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create()) {
             producer.send("foobar".getBytes());
 
             Assert.assertEquals(consumer.receive().getTopicName(), "persistent://public/default/topic1");
         }
     }
 
-    @Test(timeOut = testTimeout)
+    @Test(timeOut = TEST_TIMEOUT)
     public void testMultiTopicConsumerBatchShortName() throws Exception {
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topics(Lists.newArrayList("topic1", "topic2")).subscriptionName("sub1").subscribe();
              Producer<byte[]> producer1 = pulsarClient.newProducer()
-                .topic("topic1").enableBatching(true).batchingMaxMessages(1).create();
+                .topic("topic1").enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create();
              Producer<byte[]> producer2 = pulsarClient.newProducer()
-                .topic("topic2").enableBatching(true).batchingMaxMessages(1).create()) {
+                .topic("topic2").enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create()) {
 
             producer1.send("foobar".getBytes());
             producer2.send("foobar".getBytes());
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 1af077b..8ea08bf 100644
--- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -101,8 +101,7 @@ public class ProducerConfiguration implements Serializable {
      *            the time unit of the {@code sendTimeout}
      */
     public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) {
-        checkArgument(sendTimeout >= 0);
-        conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
+        conf.setSendTimeoutMs(sendTimeout, unit);
         return this;
     }
 
@@ -123,7 +122,6 @@ public class ProducerConfiguration implements Serializable {
      * @return
      */
     public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) {
-        checkArgument(maxPendingMessages > 0);
         conf.setMaxPendingMessages(maxPendingMessages);
         return this;
     }
@@ -154,7 +152,6 @@ public class ProducerConfiguration implements Serializable {
      * @param maxPendingMessagesAcrossPartitions
      */
     public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
-        checkArgument(maxPendingMessagesAcrossPartitions >= conf.getMaxPendingMessages());
         conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
     }
 
@@ -399,9 +396,7 @@ public class ProducerConfiguration implements Serializable {
      * @return
      */
     public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
-        long delayInMs = timeUnit.toMillis(batchDelay);
-        checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
-        conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
+        conf.setBatchingMaxPublishDelayMicros(batchDelay, timeUnit);
         return this;
     }
 
@@ -425,7 +420,6 @@ public class ProducerConfiguration implements Serializable {
      * @return
      */
     public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
-        checkArgument(batchMessagesMaxMessagesPerBatch > 0);
         conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
         return this;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 77f4dbe..d8ad226 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -123,21 +123,18 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
 
     @Override
     public ProducerBuilder<T> producerName(String producerName) {
-        checkArgument(StringUtils.isNotBlank(producerName), "producerName cannot be blank");
         conf.setProducerName(producerName);
         return this;
     }
 
     @Override
     public ProducerBuilder<T> sendTimeout(int sendTimeout, @NonNull TimeUnit unit) {
-        checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0");
-        conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
+        conf.setSendTimeoutMs(sendTimeout, unit);
         return this;
     }
 
     @Override
     public ProducerBuilder<T> maxPendingMessages(int maxPendingMessages) {
-        checkArgument(maxPendingMessages > 0, "maxPendingMessages needs to be > 0");
         conf.setMaxPendingMessages(maxPendingMessages);
         return this;
     }
@@ -205,7 +202,7 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
 
     @Override
     public ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, @NonNull TimeUnit timeUnit) {
-        conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
+        conf.setBatchingMaxPublishDelayMicros(batchDelay, timeUnit);
         return this;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 9da91f3..e5318b7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 
 import lombok.AllArgsConstructor;
 import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -41,6 +42,8 @@ import com.google.common.collect.Sets;
 
 import lombok.Data;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
@@ -49,7 +52,6 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
     private static final long serialVersionUID = 1L;
 
     private String topicName = null;
-
     private String producerName = null;
     private long sendTimeoutMs = 30000;
     private boolean blockIfQueueFull = false;
@@ -104,4 +106,36 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
             throw new RuntimeException("Failed to clone ProducerConfigurationData", e);
         }
     }
+
+    public void setProducerName(String producerName) {
+        checkArgument(StringUtils.isNotBlank(producerName), "producerName cannot be blank");
+        this.producerName = producerName;
+    }
+
+    public void setMaxPendingMessages(int maxPendingMessages) {
+        checkArgument(maxPendingMessages > 0, "maxPendingMessages needs to be > 0");
+        this.maxPendingMessages = maxPendingMessages;
+    }
+
+    public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+        checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages);
+        this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+    }
+
+    public void setBatchingMaxMessages(int batchingMaxMessages) {
+        checkArgument(batchingMaxMessages > 1, "batchingMaxMessages needs to be > 1");
+        this.batchingMaxMessages = batchingMaxMessages;
+    }
+
+    public void setSendTimeoutMs(int sendTimeout, TimeUnit timeUnit) {
+        checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0");
+        this.sendTimeoutMs = timeUnit.toMillis(sendTimeout);
+    }
+
+    public void setBatchingMaxPublishDelayMicros(long batchDelay, TimeUnit timeUnit) {
+        long delayInMs = timeUnit.toMillis(batchDelay);
+        checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
+        this.batchingMaxPublishDelayMicros = timeUnit.toMicros(batchDelay);
+    }
+
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index 862237a..35bd899 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -304,6 +304,34 @@ public class ProducerBuilderImplTest {
                 .create();
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenBatchingMaxPublishDelayPropertyIsNegative() {
+        producerBuilderImpl.batchingMaxPublishDelay(-1, TimeUnit.MILLISECONDS);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenBatchingMaxMessagesPropertyIsNegative() {
+        producerBuilderImpl.batchingMaxMessages(-1);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenSendTimeoutPropertyIsNegative() {
+        producerBuilderImpl.sendTimeout(-1, TimeUnit.SECONDS);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenMaxPendingMessagesAcrossPartitionsPropertyIsInvalid() {
+        producerBuilderImpl.maxPendingMessagesAcrossPartitions(999);
+    }
+
+    @Test
+    public void testProducerBuilderImplWhenNumericPropertiesAreValid() {
+        producerBuilderImpl.batchingMaxPublishDelay(1, TimeUnit.SECONDS);
+        producerBuilderImpl.batchingMaxMessages(2);
+        producerBuilderImpl.sendTimeout(1, TimeUnit.SECONDS);
+        producerBuilderImpl.maxPendingMessagesAcrossPartitions(1000);
+    }
+
     private class CustomMessageRouter implements MessageRouter {
         @Override
         public int choosePartition(Message<?> msg, TopicMetadata metadata) {