You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2019/12/07 15:00:58 UTC

[pulsar] branch master updated: [pulsar-java-client] Decouple partition switch frequency with maxPublishDelay in producer configuration (#5788)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c47ba8  [pulsar-java-client] Decouple partition switch frequency with maxPublishDelay in producer configuration (#5788)
6c47ba8 is described below

commit 6c47ba8bee66ade007a69ad05c80c8ba967c0306
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Dec 7 23:00:47 2019 +0800

    [pulsar-java-client] Decouple partition switch frequency with maxPublishDelay in producer configuration (#5788)
---
 .../java/org/apache/pulsar/client/api/ProducerBuilder.java | 14 ++++++++++++++
 .../apache/pulsar/client/impl/PartitionedProducerImpl.java |  2 +-
 .../org/apache/pulsar/client/impl/ProducerBuilderImpl.java |  6 ++++++
 .../client/impl/RoundRobinPartitionMessageRouterImpl.java  | 14 +++++++-------
 .../pulsar/client/impl/conf/ProducerConfigurationData.java | 10 ++++++++++
 site2/docs/client-libraries-java.md                        |  1 +
 6 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 970a401..c4a250e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -350,6 +350,20 @@ public interface ProducerBuilder<T> extends Cloneable {
     ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit);
 
     /**
+     * Set the partition switch frequency while batching of messages is enabled and
+     * using round-robin routing mode for non-keyed message <i>default: 10</i>.
+     *
+     * <p>The time period of partition switch is frequency * batchingMaxPublishDelay. During this period,
+     * all messages arrives will be route to the same partition.
+     *
+     * @param frequency the frequency of partition switch
+     * @return the producer builder instance
+     * @see #messageRoutingMode(MessageRoutingMode)
+     * @see #batchingMaxPublishDelay(long, TimeUnit)
+     */
+    ProducerBuilder<T> roundRobinRouterBatchingPartitionSwitchFrequency(int frequency);
+
+    /**
      * Set the maximum number of messages permitted in a batch. <i>default: 1000</i> If set to a value greater than 1,
      * messages will be queued until this threshold is reached or batch interval has elapsed.
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 7803447..dd7b29b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -103,7 +103,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
                         conf.getHashingScheme(),
                         ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()),
                         conf.isBatchingEnabled(),
-                        TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros()));
+                        TimeUnit.MICROSECONDS.toMillis(conf.batchingPartitionSwitchFrequencyIntervalMicros()));
         }
 
         return messageRouter;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 5d617f9..ef77af6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -209,6 +209,12 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
     }
 
     @Override
+    public ProducerBuilder<T> roundRobinRouterBatchingPartitionSwitchFrequency(int frequency) {
+        conf.setBatchingPartitionSwitchFrequencyByPublishDelay(frequency);
+        return this;
+    }
+
+    @Override
     public ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
         conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index ee8b9a0..d293ad4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -46,7 +46,7 @@ public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
 
     private final int startPtnIdx;
     private final boolean isBatchingEnabled;
-    private final long maxBatchingDelayMs;
+    private final long partitionSwitchMs;
 
     private final Clock clock;
 
@@ -55,20 +55,20 @@ public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
     public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
                                                 int startPtnIdx,
                                                 boolean isBatchingEnabled,
-                                                long maxBatchingDelayMs) {
-        this(hashingScheme, startPtnIdx, isBatchingEnabled, maxBatchingDelayMs, SYSTEM_CLOCK);
+                                                long partitionSwitchMs) {
+        this(hashingScheme, startPtnIdx, isBatchingEnabled, partitionSwitchMs, SYSTEM_CLOCK);
     }
 
     public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
                                                 int startPtnIdx,
                                                 boolean isBatchingEnabled,
-                                                long maxBatchingDelayMs,
+                                                long partitionSwitchMs,
                                                 Clock clock) {
         super(hashingScheme);
         PARTITION_INDEX_UPDATER.set(this, startPtnIdx);
         this.startPtnIdx = startPtnIdx;
         this.isBatchingEnabled = isBatchingEnabled;
-        this.maxBatchingDelayMs = Math.max(1, maxBatchingDelayMs);
+        this.partitionSwitchMs = Math.max(1, partitionSwitchMs);
         this.clock = clock;
     }
 
@@ -79,9 +79,9 @@ public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
             return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
         }
 
-        if (isBatchingEnabled) { // if batching is enabled, choose partition on `maxBatchingDelayMs` boundary.
+        if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.
             long currentMs = clock.millis();
-            return signSafeMod(currentMs / maxBatchingDelayMs + startPtnIdx, topicMetadata.numPartitions());
+            return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());
         } else {
             return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index ee9bbd9..654c15f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -70,6 +70,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
     private MessageRouter customMessageRouter = null;
 
     private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
+    private int batchingPartitionSwitchFrequencyByPublishDelay = 10;
     private int batchingMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES;
     private int batchingMaxBytes = 128 * 1024; // 128KB (keep the maximum consistent as previous versions)
     private boolean batchingEnabled = true; // enabled by default
@@ -148,4 +149,13 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
         this.batchingMaxPublishDelayMicros = timeUnit.toMicros(batchDelay);
     }
 
+    public void setBatchingPartitionSwitchFrequencyByPublishDelay(int frequencyByPublishDelay) {
+        checkArgument(frequencyByPublishDelay >= 1, "configured value for partition switch frequency must be >= 1");
+        this.batchingPartitionSwitchFrequencyByPublishDelay = frequencyByPublishDelay;
+    }
+
+    public long batchingPartitionSwitchFrequencyIntervalMicros() {
+        return this.batchingPartitionSwitchFrequencyByPublishDelay * batchingMaxPublishDelayMicros;
+    }
+
 }
diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md
index 2de8b64..592e5f1 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -180,6 +180,7 @@ MessageRoutingMode|`messageRoutingMode`|Message routing logic for producers on [
 HashingScheme|`hashingScheme`|Hashing function that determines the partition on which a particular message is published (**partitioned topics only**).<br/><br/>Below are the available options:<br/><br/><li> `pulsar.JavaStringHash`: the equivalent of `String.hashCode()` in Java<br/><br/><li> `pulsar.Murmur3_32Hash`: applies the [Murmur3](https://en.wikipedia.org/wiki/MurmurHash) hashing function<br/><br/><li>`pulsar.BoostHash`: applies the hashing function from C++'s [Boost](https://www.b [...]
 ProducerCryptoFailureAction|`cryptoFailureAction`|Producer should take action when encryption fails.<br/><br/><li>**FAIL**: if encryption fails, unencrypted messages fail to send.</li><br/><li> **SEND**: if encryption fails, unencrypted messages are sent. |`ProducerCryptoFailureAction.FAIL`
 long|`batchingMaxPublishDelayMicros`|Time period within which messages sent will be batched.|TimeUnit.MILLISECONDS.toMicros(1)
+int|`batchingPartitionSwitchFrequencyByPublishDelay`|Partition switch frequency while using round-robin routing mode. Time period of the switch is batchingPartitionSwitchFrequencyByPublishDelay * batchingMaxPublishDelayMicros.|10
 int|batchingMaxMessages|Maximum number of messages permitted in a batch.|1000
 boolean|`batchingEnabled`|Enable batching of messages. |true
 CompressionType|`compressionType`|Message data compression type used by a producer. <br/><br/>Below are the available options:<li>[`LZ4`](https://github.com/lz4/lz4)<br/><li>[`ZLIB`](https://zlib.net/)<br/><li>[`ZSTD`](https://facebook.github.io/zstd/)<br/><li>[`SNAPPY`](https://google.github.io/snappy/)| No compression