You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/01/31 05:19:05 UTC

[incubator-pulsar] branch master updated: Configure a max producer/consumer queue size enforced across partitions (#1123)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e783a58  Configure a max producer/consumer queue size enforced across partitions (#1123)
e783a58 is described below

commit e783a58b974253603f5bee72eafc04f08732f660
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 30 21:19:02 2018 -0800

    Configure a max producer/consumer queue size enforced across partitions (#1123)
---
 .../include/pulsar/ConsumerConfiguration.h         | 15 +++++++++
 .../include/pulsar/ProducerConfiguration.h         | 16 ++++++++++
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     |  8 +++++
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |  4 ++-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |  6 ++++
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |  5 +++
 pulsar-client-cpp/lib/PartitionedProducerImpl.h    |  2 +-
 pulsar-client-cpp/lib/ProducerConfiguration.cc     | 12 ++++++++
 pulsar-client-cpp/lib/ProducerConfigurationImpl.h  |  2 ++
 .../pulsar/client/api/ConsumerConfiguration.java   | 33 +++++++++++++++++---
 .../pulsar/client/api/ProducerConfiguration.java   | 36 +++++++++++++++++-----
 .../client/impl/PartitionedConsumerImpl.java       | 10 ++++--
 .../client/impl/PartitionedProducerImpl.java       |  4 +++
 13 files changed, 136 insertions(+), 17 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 4ffe6df..2f72eb8 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -97,6 +97,21 @@ class ConsumerConfiguration {
     void setReceiverQueueSize(int size);
     int getReceiverQueueSize() const;
 
+    /**
+     * Set the max total receiver queue size across partitons.
+     * <p>
+     * This setting will be used to reduce the receiver queue size for individual partitions
+     * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+     *
+     * @param maxTotalReceiverQueueSizeAcrossPartitions
+     */
+    void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
+
+    /**
+     * @return the configured max total receiver queue size across partitions
+     */
+    int getMaxTotalReceiverQueueSizeAcrossPartitions() const;
+
     void setConsumerName(const std::string&);
     const std::string& getConsumerName() const;
 
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 2c745a3..9d2d5f9 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -64,6 +64,22 @@ class ProducerConfiguration {
     ProducerConfiguration& setMaxPendingMessages(int maxPendingMessages);
     int getMaxPendingMessages() const;
 
+    /**
+     * Set the number of max pending messages across all the partitions
+     * <p>
+     * This setting will be used to lower the max pending messages for each partition
+     * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+     *
+     * @param maxPendingMessagesAcrossPartitions
+     */
+    ProducerConfiguration& setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
+
+    /**
+     *
+     * @return the maximum number of pending messages allowed across all the partitions
+     */
+    int getMaxPendingMessagesAcrossPartitions() const;
+
     ProducerConfiguration& setPartitionsRoutingMode(const PartitionsRoutingMode& mode);
     PartitionsRoutingMode getPartitionsRoutingMode() const;
 
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index be19a44..17d2c4b 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -60,6 +60,14 @@ void ConsumerConfiguration::setReceiverQueueSize(int size) { impl_->receiverQueu
 
 int ConsumerConfiguration::getReceiverQueueSize() const { return impl_->receiverQueueSize; }
 
+void ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions(int size) {
+    impl_->maxTotalReceiverQueueSizeAcrossPartitions = size;
+}
+
+int ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions() const {
+    return impl_->maxTotalReceiverQueueSizeAcrossPartitions;
+}
+
 const std::string& ConsumerConfiguration::getConsumerName() const { return impl_->consumerName; }
 
 void ConsumerConfiguration::setConsumerName(const std::string& consumerName) {
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 3f7ad06..1fafc9a 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -29,6 +29,7 @@ struct ConsumerConfigurationImpl {
     MessageListener messageListener;
     bool hasMessageListener;
     int receiverQueueSize;
+    int maxTotalReceiverQueueSizeAcrossPartitions;
     std::string consumerName;
     long brokerConsumerStatsCacheTimeInMs;
     ConsumerConfigurationImpl()
@@ -37,7 +38,8 @@ struct ConsumerConfigurationImpl {
           messageListener(),
           hasMessageListener(false),
           brokerConsumerStatsCacheTimeInMs(30 * 1000),  // 30 seconds
-          receiverQueueSize(1000) {}
+          receiverQueueSize(1000),
+          maxTotalReceiverQueueSizeAcrossPartitions(50000) {}
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 52a3c46..3d22de9 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -170,6 +170,12 @@ void PartitionedConsumerImpl::start() {
     config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
     config.setMessageListener(
         boost::bind(&PartitionedConsumerImpl::messageReceived, shared_from_this(), _1, _2));
+
+    // Apply total limit of receiver queue size across partitions
+    config.setReceiverQueueSize(
+        std::min(conf_.getReceiverQueueSize(),
+                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions_)));
+
     // create consumer on each partition
     for (unsigned int i = 0; i < numPartitions_; i++) {
         std::string topicPartitionName = destinationName_->getTopicPartitionName(i);
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 6a15056..825bcc0 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -46,6 +46,11 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client,
     numProducersCreated_ = 0;
     cleanup_ = false;
     routerPolicy_ = getMessageRouter();
+
+    int maxPendingMessagesPerPartition =
+        std::min(config.getMaxPendingMessages(),
+                 (int)(config.getMaxPendingMessagesAcrossPartitions() / numPartitions));
+    conf_.setMaxPendingMessages(maxPendingMessagesPerPartition);
 }
 
 MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index e868b03..b06c592 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -96,7 +96,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
      */
     bool cleanup_;
 
-    const ProducerConfiguration conf_;
+    ProducerConfiguration conf_;
 
     typedef std::vector<ProducerImplPtr> ProducerList;
 
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 9ae4c61..011d859 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -73,6 +73,18 @@ ProducerConfiguration& ProducerConfiguration::setMaxPendingMessages(int maxPendi
 
 int ProducerConfiguration::getMaxPendingMessages() const { return impl_->maxPendingMessages; }
 
+ProducerConfiguration& ProducerConfiguration::setMaxPendingMessagesAcrossPartitions(int maxPendingMessages) {
+    if (maxPendingMessages <= 0) {
+        throw "maxPendingMessages needs to be greater than 0";
+    }
+    impl_->maxPendingMessagesAcrossPartitions = maxPendingMessages;
+    return *this;
+}
+
+int ProducerConfiguration::getMaxPendingMessagesAcrossPartitions() const {
+    return impl_->maxPendingMessagesAcrossPartitions;
+}
+
 ProducerConfiguration& ProducerConfiguration::setPartitionsRoutingMode(const PartitionsRoutingMode& mode) {
     impl_->routingMode = mode;
     return *this;
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index e3e793d..f211fb7 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -32,6 +32,7 @@ struct ProducerConfigurationImpl {
     int sendTimeoutMs;
     CompressionType compressionType;
     int maxPendingMessages;
+    int maxPendingMessagesAcrossPartitions;
     ProducerConfiguration::PartitionsRoutingMode routingMode;
     MessageRoutingPolicyPtr messageRouter;
     bool blockIfQueueFull;
@@ -43,6 +44,7 @@ struct ProducerConfigurationImpl {
         : sendTimeoutMs(30000),
           compressionType(CompressionNone),
           maxPendingMessages(1000),
+          maxPendingMessagesAcrossPartitions(50000),
           routingMode(ProducerConfiguration::UseSinglePartition),
           blockIfQueueFull(false),
           batchingEnabled(false),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 6d87cfc..e031788 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -48,10 +48,12 @@ public class ConsumerConfiguration implements Serializable {
 
     private int receiverQueueSize = 1000;
 
+    private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
+
     private String consumerName = null;
 
     private long ackTimeoutMillis = 0;
-    
+
     private int priorityLevel = 0;
 
     private CryptoKeyReader cryptoKeyReader = null;
@@ -134,6 +136,27 @@ public class ConsumerConfiguration implements Serializable {
         return this.receiverQueueSize;
     }
 
+
+    /**
+     * @return the configured max total receiver queue size across partitions
+     */
+    public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
+        return maxTotalReceiverQueueSizeAcrossPartitions;
+    }
+
+    /**
+     * Set the max total receiver queue size across partitons.
+     * <p>
+     * This setting will be used to reduce the receiver queue size for individual partitions
+     * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+     *
+     * @param maxTotalReceiverQueueSizeAcrossPartitions
+     */
+    public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
+        checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= receiverQueueSize);
+        this.maxTotalReceiverQueueSizeAcrossPartitions = maxTotalReceiverQueueSizeAcrossPartitions;
+    }
+
     /**
      * @return the CryptoKeyReader
      */
@@ -155,7 +178,7 @@ public class ConsumerConfiguration implements Serializable {
 
     /**
      * Sets the ConsumerCryptoFailureAction to the value specified
-     * 
+     *
      * @param The consumer action
      */
     public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
@@ -218,7 +241,7 @@ public class ConsumerConfiguration implements Serializable {
         this.consumerName = consumerName;
         return this;
     }
-    
+
     public int getPriorityLevel() {
         return priorityLevel;
     }
@@ -230,7 +253,7 @@ public class ConsumerConfiguration implements Serializable {
      * permits, else broker will consider next priority level consumers. </br>
      * If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch
      * messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
-     * 
+     *
      * <pre>
      * Consumer PriorityLevel Permits
      * C1       0             2
@@ -240,7 +263,7 @@ public class ConsumerConfiguration implements Serializable {
      * C5       1             1
      * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
      * </pre>
-     * 
+     *
      * @param priorityLevel
      */
     public void setPriorityLevel(int priorityLevel) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 9accd61..08b73c2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -25,11 +25,9 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
-import org.apache.pulsar.client.impl.SinglePartitionMessageRouterImpl;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 
 import com.google.common.base.Objects;
@@ -45,6 +43,7 @@ public class ProducerConfiguration implements Serializable {
     private long sendTimeoutMs = 30000;
     private boolean blockIfQueueFull = false;
     private int maxPendingMessages = 1000;
+    private int maxPendingMessagesAcrossPartitions = 50000;
     private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition;
     private MessageRouter customMessageRouter = null;
     private long batchingMaxPublishDelayMs = 10;
@@ -139,6 +138,27 @@ public class ProducerConfiguration implements Serializable {
 
     /**
      *
+     * @return the maximum number of pending messages allowed across all the partitions
+     */
+    public int getMaxPendingMessagesAcrossPartitions() {
+        return maxPendingMessagesAcrossPartitions;
+    }
+
+    /**
+     * Set the number of max pending messages across all the partitions
+     * <p>
+     * This setting will be used to lower the max pending messages for each partition
+     * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+     *
+     * @param maxPendingMessagesAcrossPartitions
+     */
+    public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+        checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages);
+        this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+    }
+
+    /**
+     *
      * @return whether the producer will block {@link Producer#send} and {@link Producer#sendAsync} operations when the
      *         pending queue is full
      */
@@ -295,18 +315,18 @@ public class ProducerConfiguration implements Serializable {
     }
 
     /**
-     * 
+     *
      * @return encryptionKeys
-     *  
+     *
      */
     public  ConcurrentOpenHashSet<String> getEncryptionKeys() {
         return this.encryptionKeys;
     }
 
     /**
-     * 
+     *
      * Returns true if encryption keys are added
-     *  
+     *
      */
     public boolean isEncryptionEnabled() {
         return (this.encryptionKeys != null) && !this.encryptionKeys.isEmpty();
@@ -352,7 +372,7 @@ public class ProducerConfiguration implements Serializable {
     }
 
     /**
-     * 
+     *
      * @return the batch time period in ms.
      * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
      */
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 2d7830e..1dd3e4b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.util.*;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
@@ -32,7 +34,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
@@ -438,6 +439,11 @@ public class PartitionedConsumerImpl extends ConsumerBase {
         internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
         internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
         internalConsumerConfig.setConsumerName(consumerName);
+
+        int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
+                conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions);
+        internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
+
         if (conf.getCryptoKeyReader() != null) {
             internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
             internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 5c27754..04269f3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -56,6 +56,10 @@ public class PartitionedProducerImpl extends ProducerBase {
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
         stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStats() : null;
+
+        int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
+                conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
+        conf.setMaxPendingMessages(maxPendingMessages);
         start();
     }
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.