You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/11/02 03:27:08 UTC

[pulsar] branch branch-2.6 updated: [Client]Add autoPartitionsUpdateInterval for producer and consumer (#7840)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 6db019e  [Client]Add autoPartitionsUpdateInterval for producer and consumer (#7840)
6db019e is described below

commit 6db019e91cebd04402dd887ebd15ef849e00dc10
Author: Aaron Robert <Ro...@outlook.com>
AuthorDate: Fri Aug 21 22:16:14 2020 +0800

    [Client]Add autoPartitionsUpdateInterval for producer and consumer (#7840)
    
    Motivation
    Add auto partitions update interval setting for producer and consumer.
    
    Modifications
    add autoUpdatePartitionsInterval to partitioned producer and consumer
    
    (cherry picked from commit 7d9319d844f21ed104c502bcc6c2b853b10ddc80)
---
 .../java/org/apache/pulsar/client/api/ConsumerBuilder.java   | 12 ++++++++++++
 .../java/org/apache/pulsar/client/api/ProducerBuilder.java   | 12 ++++++++++++
 .../org/apache/pulsar/client/impl/ConsumerBuilderImpl.java   |  6 ++++++
 .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java   |  2 +-
 .../apache/pulsar/client/impl/PartitionedProducerImpl.java   |  2 +-
 .../org/apache/pulsar/client/impl/ProducerBuilderImpl.java   |  6 ++++++
 .../pulsar/client/impl/conf/ConsumerConfigurationData.java   |  7 +++++++
 .../pulsar/client/impl/conf/ProducerConfigurationData.java   |  6 ++++++
 .../pulsar/client/impl/conf/ConfigurationDataUtilsTest.java  |  5 +++++
 9 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 9bb8ce0..ac26df1 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -553,6 +553,18 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
 
     /**
+     * Set the interval of updating partitions <i>(default: 1 minute)</i>. This only works if autoUpdatePartitions is
+     * enabled.
+     *
+     * @param interval
+     *            the interval of updating partitions
+     * @param unit
+     *            the time unit of the interval.
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
+
+    /**
      * Set KeyShared subscription policy for consumer.
      *
      * <p>By default, KeyShared subscription use auto split hash range to maintain consumers. If you want to
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index f94e6c2..0ca4941 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -504,6 +504,18 @@ public interface ProducerBuilder<T> extends Cloneable {
     ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
 
     /**
+     * Set the interval of updating partitions <i>(default: 1 minute)</i>. This only works if autoUpdatePartitions is
+     * enabled.
+     *
+     * @param interval
+     *            the interval of updating partitions
+     * @param unit
+     *            the time unit of the interval.
+     * @return the producer builder instance
+     */
+    ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
+
+    /**
      * Control whether enable the multiple schema mode for producer.
      * If enabled, producer can send a message with different schema from that specified just when it is created,
      * otherwise a invalid message exception would be threw
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 64eadfe..2cf8b03 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -384,6 +384,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     }
 
     @Override
+    public ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit) {
+        conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
+        return this;
+    }
+
+    @Override
 
     public ConsumerBuilder<T> startMessageIdInclusive() {
         conf.setResetIncludeHead(true);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 67f09c7..8be9b8a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -137,7 +137,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         if (conf.isAutoUpdatePartitions()) {
             topicsPartitionChangedListener = new TopicsPartitionChangedListener();
             partitionsAutoUpdateTimeout = client.timer()
-                .newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES);
+                .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
         }
 
         if (conf.getTopicNames().isEmpty()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 765524c..a8a5c83 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -80,7 +80,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
         if (conf.isAutoUpdatePartitions()) {
             topicsPartitionChangedListener = new TopicsPartitionChangedListener();
             partitionsAutoUpdateTimeout = client.timer()
-                .newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES);
+                .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 9aaec55..64a22d7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -295,6 +295,12 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
     }
 
     @Override
+    public ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit) {
+        conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
+        return this;
+    }
+
+    @Override
     public ProducerBuilder<T> enableMultiSchema(boolean multiSchema) {
         conf.setMultiSchema(multiSchema);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 14595e6..7f5d65b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -119,6 +119,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
 
     private boolean autoUpdatePartitions = true;
 
+    private long autoUpdatePartitionsIntervalSeconds = 60;
+
     private boolean replicateSubscriptionState = false;
 
     private boolean resetIncludeHead = false;
@@ -127,6 +129,11 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
 
     private boolean batchIndexAckEnabled = false;
 
+    public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
+        checkArgument(interval > 0, "interval needs to be > 0");
+        this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
+    }
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index fd2f678..c48598a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -95,6 +95,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
 
     private boolean autoUpdatePartitions = true;
 
+    private long autoUpdatePartitionsIntervalSeconds = 60;
+
     private boolean multiSchema = true;
 
     private SortedMap<String, String> properties = new TreeMap<>();
@@ -163,4 +165,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
         return this.batchingPartitionSwitchFrequencyByPublishDelay * batchingMaxPublishDelayMicros;
     }
 
+    public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
+        checkArgument(interval > 0, "interval needs to be > 0");
+        this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
+    }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index 9634a87..a1bafbd 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -62,6 +63,7 @@ public class ConfigurationDataUtilsTest {
         confData.setProducerName("unset");
         confData.setBatchingEnabled(true);
         confData.setBatchingMaxMessages(1234);
+        confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
         Map<String, Object> config = new HashMap<>();
         config.put("producerName", "test-producer");
         config.put("batchingEnabled", false);
@@ -70,6 +72,7 @@ public class ConfigurationDataUtilsTest {
         assertEquals("test-producer", confData.getProducerName());
         assertFalse(confData.isBatchingEnabled());
         assertEquals(1234, confData.getBatchingMaxMessages());
+        assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
     }
 
     @Test
@@ -78,6 +81,7 @@ public class ConfigurationDataUtilsTest {
         confData.setSubscriptionName("unknown-subscription");
         confData.setPriorityLevel(10000);
         confData.setConsumerName("unknown-consumer");
+        confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
         Map<String, Object> config = new HashMap<>();
         config.put("subscriptionName", "test-subscription");
         config.put("priorityLevel", 100);
@@ -85,6 +89,7 @@ public class ConfigurationDataUtilsTest {
         assertEquals("test-subscription", confData.getSubscriptionName());
         assertEquals(100, confData.getPriorityLevel());
         assertEquals("unknown-consumer", confData.getConsumerName());
+        assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
     }
 
     @Test