You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/11/02 03:27:08 UTC
[pulsar] branch branch-2.6 updated: [Client]Add
autoPartitionsUpdateInterval for producer and consumer (#7840)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 6db019e [Client]Add autoPartitionsUpdateInterval for producer and consumer (#7840)
6db019e is described below
commit 6db019e91cebd04402dd887ebd15ef849e00dc10
Author: Aaron Robert <Ro...@outlook.com>
AuthorDate: Fri Aug 21 22:16:14 2020 +0800
[Client]Add autoPartitionsUpdateInterval for producer and consumer (#7840)
Motivation
Add auto partitions update interval setting for producer and consumer.
Modifications
add autoUpdatePartitionsInterval to partitioned producer and consumer
(cherry picked from commit 7d9319d844f21ed104c502bcc6c2b853b10ddc80)
---
.../java/org/apache/pulsar/client/api/ConsumerBuilder.java | 12 ++++++++++++
.../java/org/apache/pulsar/client/api/ProducerBuilder.java | 12 ++++++++++++
.../org/apache/pulsar/client/impl/ConsumerBuilderImpl.java | 6 ++++++
.../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 2 +-
.../apache/pulsar/client/impl/PartitionedProducerImpl.java | 2 +-
.../org/apache/pulsar/client/impl/ProducerBuilderImpl.java | 6 ++++++
.../pulsar/client/impl/conf/ConsumerConfigurationData.java | 7 +++++++
.../pulsar/client/impl/conf/ProducerConfigurationData.java | 6 ++++++
.../pulsar/client/impl/conf/ConfigurationDataUtilsTest.java | 5 +++++
9 files changed, 56 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 9bb8ce0..ac26df1 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -553,6 +553,18 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
/**
+ * Set the interval of updating partitions <i>(default: 1 minute)</i>. This only works if autoUpdatePartitions is
+ * enabled.
+ *
+ * @param interval
+ * the interval of updating partitions
+ * @param unit
+ * the time unit of the interval.
+ * @return the consumer builder instance
+ */
+ ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
+
+ /**
* Set KeyShared subscription policy for consumer.
*
* <p>By default, KeyShared subscription use auto split hash range to maintain consumers. If you want to
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index f94e6c2..0ca4941 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -504,6 +504,18 @@ public interface ProducerBuilder<T> extends Cloneable {
ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
/**
+ * Set the interval of updating partitions <i>(default: 1 minute)</i>. This only works if autoUpdatePartitions is
+ * enabled.
+ *
+ * @param interval
+ * the interval of updating partitions
+ * @param unit
+ * the time unit of the interval.
+ * @return the producer builder instance
+ */
+ ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
+
+ /**
* Control whether enable the multiple schema mode for producer.
* If enabled, producer can send a message with different schema from that specified just when it is created,
* otherwise a invalid message exception would be threw
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 64eadfe..2cf8b03 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -384,6 +384,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
}
@Override
+ public ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit) {
+ conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
+ return this;
+ }
+
+ @Override
public ConsumerBuilder<T> startMessageIdInclusive() {
conf.setResetIncludeHead(true);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 67f09c7..8be9b8a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -137,7 +137,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
if (conf.isAutoUpdatePartitions()) {
topicsPartitionChangedListener = new TopicsPartitionChangedListener();
partitionsAutoUpdateTimeout = client.timer()
- .newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES);
+ .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
if (conf.getTopicNames().isEmpty()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 765524c..a8a5c83 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -80,7 +80,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
if (conf.isAutoUpdatePartitions()) {
topicsPartitionChangedListener = new TopicsPartitionChangedListener();
partitionsAutoUpdateTimeout = client.timer()
- .newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES);
+ .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 9aaec55..64a22d7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -295,6 +295,12 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
}
@Override
+ public ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit) {
+ conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
+ return this;
+ }
+
+ @Override
public ProducerBuilder<T> enableMultiSchema(boolean multiSchema) {
conf.setMultiSchema(multiSchema);
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 14595e6..7f5d65b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -119,6 +119,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private boolean autoUpdatePartitions = true;
+ private long autoUpdatePartitionsIntervalSeconds = 60;
+
private boolean replicateSubscriptionState = false;
private boolean resetIncludeHead = false;
@@ -127,6 +129,11 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private boolean batchIndexAckEnabled = false;
+ public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
+ checkArgument(interval > 0, "interval needs to be > 0");
+ this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
+ }
+
@JsonIgnore
public String getSingleTopic() {
checkArgument(topicNames.size() == 1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index fd2f678..c48598a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -95,6 +95,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private boolean autoUpdatePartitions = true;
+ private long autoUpdatePartitionsIntervalSeconds = 60;
+
private boolean multiSchema = true;
private SortedMap<String, String> properties = new TreeMap<>();
@@ -163,4 +165,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
return this.batchingPartitionSwitchFrequencyByPublishDelay * batchingMaxPublishDelayMicros;
}
+ public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
+ checkArgument(interval > 0, "interval needs to be > 0");
+ this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
+ }
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index 9634a87..a1bafbd 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -62,6 +63,7 @@ public class ConfigurationDataUtilsTest {
confData.setProducerName("unset");
confData.setBatchingEnabled(true);
confData.setBatchingMaxMessages(1234);
+ confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
Map<String, Object> config = new HashMap<>();
config.put("producerName", "test-producer");
config.put("batchingEnabled", false);
@@ -70,6 +72,7 @@ public class ConfigurationDataUtilsTest {
assertEquals("test-producer", confData.getProducerName());
assertFalse(confData.isBatchingEnabled());
assertEquals(1234, confData.getBatchingMaxMessages());
+ assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
}
@Test
@@ -78,6 +81,7 @@ public class ConfigurationDataUtilsTest {
confData.setSubscriptionName("unknown-subscription");
confData.setPriorityLevel(10000);
confData.setConsumerName("unknown-consumer");
+ confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
Map<String, Object> config = new HashMap<>();
config.put("subscriptionName", "test-subscription");
config.put("priorityLevel", 100);
@@ -85,6 +89,7 @@ public class ConfigurationDataUtilsTest {
assertEquals("test-subscription", confData.getSubscriptionName());
assertEquals(100, confData.getPriorityLevel());
assertEquals("unknown-consumer", confData.getConsumerName());
+ assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
}
@Test