You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2019/12/07 15:00:58 UTC
[pulsar] branch master updated: [pulsar-java-client] Decouple
partition switch frequency with maxPublishDelay in producer configuration
(#5788)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6c47ba8 [pulsar-java-client] Decouple partition switch frequency with maxPublishDelay in producer configuration (#5788)
6c47ba8 is described below
commit 6c47ba8bee66ade007a69ad05c80c8ba967c0306
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Dec 7 23:00:47 2019 +0800
[pulsar-java-client] Decouple partition switch frequency with maxPublishDelay in producer configuration (#5788)
---
.../java/org/apache/pulsar/client/api/ProducerBuilder.java | 14 ++++++++++++++
.../apache/pulsar/client/impl/PartitionedProducerImpl.java | 2 +-
.../org/apache/pulsar/client/impl/ProducerBuilderImpl.java | 6 ++++++
.../client/impl/RoundRobinPartitionMessageRouterImpl.java | 14 +++++++-------
.../pulsar/client/impl/conf/ProducerConfigurationData.java | 10 ++++++++++
site2/docs/client-libraries-java.md | 1 +
6 files changed, 39 insertions(+), 8 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 970a401..c4a250e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -350,6 +350,20 @@ public interface ProducerBuilder<T> extends Cloneable {
ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit);
/**
+ * Set the partition switch frequency while batching of messages is enabled and
+ * using round-robin routing mode for non-keyed message <i>default: 10</i>.
+ *
+ * <p>The time period of partition switch is frequency * batchingMaxPublishDelay. During this period,
+ * all messages arrives will be route to the same partition.
+ *
+ * @param frequency the frequency of partition switch
+ * @return the producer builder instance
+ * @see #messageRoutingMode(MessageRoutingMode)
+ * @see #batchingMaxPublishDelay(long, TimeUnit)
+ */
+ ProducerBuilder<T> roundRobinRouterBatchingPartitionSwitchFrequency(int frequency);
+
+ /**
* Set the maximum number of messages permitted in a batch. <i>default: 1000</i> If set to a value greater than 1,
* messages will be queued until this threshold is reached or batch interval has elapsed.
*
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 7803447..dd7b29b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -103,7 +103,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
conf.getHashingScheme(),
ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()),
conf.isBatchingEnabled(),
- TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros()));
+ TimeUnit.MICROSECONDS.toMillis(conf.batchingPartitionSwitchFrequencyIntervalMicros()));
}
return messageRouter;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 5d617f9..ef77af6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -209,6 +209,12 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
}
@Override
+ public ProducerBuilder<T> roundRobinRouterBatchingPartitionSwitchFrequency(int frequency) {
+ conf.setBatchingPartitionSwitchFrequencyByPublishDelay(frequency);
+ return this;
+ }
+
+ @Override
public ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index ee8b9a0..d293ad4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -46,7 +46,7 @@ public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
private final int startPtnIdx;
private final boolean isBatchingEnabled;
- private final long maxBatchingDelayMs;
+ private final long partitionSwitchMs;
private final Clock clock;
@@ -55,20 +55,20 @@ public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
int startPtnIdx,
boolean isBatchingEnabled,
- long maxBatchingDelayMs) {
- this(hashingScheme, startPtnIdx, isBatchingEnabled, maxBatchingDelayMs, SYSTEM_CLOCK);
+ long partitionSwitchMs) {
+ this(hashingScheme, startPtnIdx, isBatchingEnabled, partitionSwitchMs, SYSTEM_CLOCK);
}
public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
int startPtnIdx,
boolean isBatchingEnabled,
- long maxBatchingDelayMs,
+ long partitionSwitchMs,
Clock clock) {
super(hashingScheme);
PARTITION_INDEX_UPDATER.set(this, startPtnIdx);
this.startPtnIdx = startPtnIdx;
this.isBatchingEnabled = isBatchingEnabled;
- this.maxBatchingDelayMs = Math.max(1, maxBatchingDelayMs);
+ this.partitionSwitchMs = Math.max(1, partitionSwitchMs);
this.clock = clock;
}
@@ -79,9 +79,9 @@ public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
}
- if (isBatchingEnabled) { // if batching is enabled, choose partition on `maxBatchingDelayMs` boundary.
+ if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.
long currentMs = clock.millis();
- return signSafeMod(currentMs / maxBatchingDelayMs + startPtnIdx, topicMetadata.numPartitions());
+ return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());
} else {
return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index ee9bbd9..654c15f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -70,6 +70,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private MessageRouter customMessageRouter = null;
private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
+ private int batchingPartitionSwitchFrequencyByPublishDelay = 10;
private int batchingMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES;
private int batchingMaxBytes = 128 * 1024; // 128KB (keep the maximum consistent as previous versions)
private boolean batchingEnabled = true; // enabled by default
@@ -148,4 +149,13 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
this.batchingMaxPublishDelayMicros = timeUnit.toMicros(batchDelay);
}
+ public void setBatchingPartitionSwitchFrequencyByPublishDelay(int frequencyByPublishDelay) {
+ checkArgument(frequencyByPublishDelay >= 1, "configured value for partition switch frequency must be >= 1");
+ this.batchingPartitionSwitchFrequencyByPublishDelay = frequencyByPublishDelay;
+ }
+
+ public long batchingPartitionSwitchFrequencyIntervalMicros() {
+ return this.batchingPartitionSwitchFrequencyByPublishDelay * batchingMaxPublishDelayMicros;
+ }
+
}
diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md
index 2de8b64..592e5f1 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -180,6 +180,7 @@ MessageRoutingMode|`messageRoutingMode`|Message routing logic for producers on [
HashingScheme|`hashingScheme`|Hashing function that determines the partition on which a particular message is published (**partitioned topics only**).<br/><br/>Below are the available options:<br/><br/><li> `pulsar.JavaStringHash`: the equivalent of `String.hashCode()` in Java<br/><br/><li> `pulsar.Murmur3_32Hash`: applies the [Murmur3](https://en.wikipedia.org/wiki/MurmurHash) hashing function<br/><br/><li>`pulsar.BoostHash`: applies the hashing function from C++'s [Boost](https://www.b [...]
ProducerCryptoFailureAction|`cryptoFailureAction`|Producer should take action when encryption fails.<br/><br/><li>**FAIL**: if encryption fails, unencrypted messages fail to send.</li><br/><li> **SEND**: if encryption fails, unencrypted messages are sent. |`ProducerCryptoFailureAction.FAIL`
long|`batchingMaxPublishDelayMicros`|Time period within which messages sent will be batched.|TimeUnit.MILLISECONDS.toMicros(1)
+int|`batchingPartitionSwitchFrequencyByPublishDelay`|Partition switch frequency while using round-robin routing mode. Time period of the switch is batchingPartitionSwitchFrequencyByPublishDelay * batchingMaxPublishDelayMicros.|10
int|batchingMaxMessages|Maximum number of messages permitted in a batch.|1000
boolean|`batchingEnabled`|Enable batching of messages. |true
CompressionType|`compressionType`|Message data compression type used by a producer. <br/><br/>Below are the available options:<li>[`LZ4`](https://github.com/lz4/lz4)<br/><li>[`ZLIB`](https://zlib.net/)<br/><li>[`ZSTD`](https://facebook.github.io/zstd/)<br/><li>[`SNAPPY`](https://google.github.io/snappy/)| No compression