You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/03 22:06:07 UTC

[pulsar] branch master updated: [Pulsar-Client] Add Consumer Numeric Properties Validation (#4446)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 870a281  [Pulsar-Client] Add Consumer Numeric Properties Validation (#4446)
870a281 is described below

commit 870a2814b514187a81743d80061a129ba4018cf3
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Mon Jun 3 23:06:01 2019 +0100

    [Pulsar-Client] Add Consumer Numeric Properties Validation (#4446)
---
 README.md                                          | 15 ++++++++++-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  6 ++++-
 .../client/impl/ConsumerBuilderImplTest.java       | 29 ++++++++++++++++++++++
 .../version-2.1.0-incubating/io-rabbitmq.md        |  4 +--
 4 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/README.md b/README.md
index 5f48a72..706980e 100644
--- a/README.md
+++ b/README.md
@@ -24,7 +24,7 @@
 Pulsar is a distributed pub-sub messaging platform with a very
 flexible messaging model and an intuitive client API.
 
-https://pulsar.apache.org
+Learn more about Pulsar at https://pulsar.apache.org
 
 ## Main features
 * Horizontally scalable (Millions of independent topics and millions
@@ -58,6 +58,19 @@ Compile and install:
 $ mvn install -DskipTests
 ```
 
+Run Unit Tests:
+
+```bash
+$ mvn test
+```
+
+Run Individual Unit Test:
+
+```bash
+cd related-module (e.g: pulsar-client)
+$ mvn test -Dtest=unit-test-name (e.g: ConsumerBuilderImplTest)
+```
+
 Start standalone Pulsar service:
 
 ```bash
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 19f5c6e..0df9a89 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -160,13 +160,14 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     @Override
     public ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit) {
         checkArgument(ackTimeout == 0 || timeUnit.toMillis(ackTimeout) >= MIN_ACK_TIMEOUT_MILLIS,
-                "Ack timeout should be should be greater than " + MIN_ACK_TIMEOUT_MILLIS + " ms");
+                "Ack timeout should be greater than " + MIN_ACK_TIMEOUT_MILLIS + " ms");
         conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
         return this;
     }
 
     @Override
     public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit) {
+        checkArgument(redeliveryDelay >= 0, "redeliveryDelay needs to be >= 0");
         conf.setNegativeAckRedeliveryDelayMicros(timeUnit.toMicros(redeliveryDelay));
         return this;
     }
@@ -224,6 +225,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
 
     @Override
     public ConsumerBuilder<T> priorityLevel(int priorityLevel) {
+        checkArgument(priorityLevel >= 0, "priorityLevel needs to be >= 0");
         conf.setPriorityLevel(priorityLevel);
         return this;
     }
@@ -249,6 +251,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
 
     @Override
     public ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
+        checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= 0, "maxTotalReceiverQueueSizeAcrossPartitions needs to be >= 0");
         conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
         return this;
     }
@@ -261,6 +264,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
 
     @Override
     public ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes) {
+        checkArgument(periodInMinutes >= 0, "periodInMinutes needs to be >= 0");
         conf.setPatternAutoDiscoveryPeriod(periodInMinutes);
         return this;
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index daa313c..5d902b1 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import static org.mockito.Mockito.mock;
@@ -220,4 +221,32 @@ public class ConsumerBuilderImplTest {
         consumerBuilderImpl.topic(TOPIC_NAME).subscriptionTopicsMode(null);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenNegativeAckRedeliveryDelayPropertyIsNegative() {
+        consumerBuilderImpl.negativeAckRedeliveryDelay(-1, TimeUnit.MILLISECONDS);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPriorityLevelPropertyIsNegative() {
+        consumerBuilderImpl.priorityLevel(-1);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenMaxTotalReceiverQueueSizeAcrossPartitionsPropertyIsNegative() {
+        consumerBuilderImpl.maxTotalReceiverQueueSizeAcrossPartitions(-1);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenPatternAutoDiscoveryPeriodPropertyIsNegative() {
+        consumerBuilderImpl.patternAutoDiscoveryPeriod(-1);
+    }
+
+    @Test
+    public void testConsumerBuilderImplWhenNumericPropertiesAreValid() {
+        consumerBuilderImpl.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS);
+        consumerBuilderImpl.priorityLevel(1);
+        consumerBuilderImpl.maxTotalReceiverQueueSizeAcrossPartitions(1);
+        consumerBuilderImpl.patternAutoDiscoveryPeriod(1);
+    }
+
 }
diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/io-rabbitmq.md b/site2/website/versioned_docs/version-2.1.0-incubating/io-rabbitmq.md
index c0a8757..74b264b 100644
--- a/site2/website/versioned_docs/version-2.1.0-incubating/io-rabbitmq.md
+++ b/site2/website/versioned_docs/version-2.1.0-incubating/io-rabbitmq.md
@@ -1,13 +1,13 @@
 ---
 id: version-2.1.0-incubating-io-rabbitmq
 title: RabbitMQ Connector
-sidebar_label: RabittMQ Connector
+sidebar_label: RabbitMQ Connector
 original_id: io-rabbitmq
 ---
 
 ## Source
 
-The RabittMQ Source connector is used for receiving messages from a RabittMQ cluster and writing
+The RabbitMQ Source connector is used for receiving messages from a RabbitMQ cluster and writing
 messages to Pulsar topics.
 
 ### Source Configuration Options