You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/31 05:19:07 UTC

[GitHub] merlimat closed pull request #1123: Configure a max producer/consumer queue size enforced across partitions

merlimat closed pull request #1123: Configure a max producer/consumer queue size enforced across partitions
URL: https://github.com/apache/incubator-pulsar/pull/1123
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 4ffe6df60..2f72eb8ae 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -97,6 +97,21 @@ class ConsumerConfiguration {
     void setReceiverQueueSize(int size);
     int getReceiverQueueSize() const;
 
+    /**
+     * Set the max total receiver queue size across partitons.
+     * <p>
+     * This setting will be used to reduce the receiver queue size for individual partitions
+     * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+     *
+     * @param maxTotalReceiverQueueSizeAcrossPartitions
+     */
+    void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
+
+    /**
+     * @return the configured max total receiver queue size across partitions
+     */
+    int getMaxTotalReceiverQueueSizeAcrossPartitions() const;
+
     void setConsumerName(const std::string&);
     const std::string& getConsumerName() const;
 
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 2c745a3ca..9d2d5f9bb 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -64,6 +64,22 @@ class ProducerConfiguration {
     ProducerConfiguration& setMaxPendingMessages(int maxPendingMessages);
     int getMaxPendingMessages() const;
 
+    /**
+     * Set the number of max pending messages across all the partitions
+     * <p>
+     * This setting will be used to lower the max pending messages for each partition
+     * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+     *
+     * @param maxPendingMessagesAcrossPartitions
+     */
+    ProducerConfiguration& setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
+
+    /**
+     *
+     * @return the maximum number of pending messages allowed across all the partitions
+     */
+    int getMaxPendingMessagesAcrossPartitions() const;
+
     ProducerConfiguration& setPartitionsRoutingMode(const PartitionsRoutingMode& mode);
     PartitionsRoutingMode getPartitionsRoutingMode() const;
 
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index be19a44d1..17d2c4b7a 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -60,6 +60,14 @@ void ConsumerConfiguration::setReceiverQueueSize(int size) { impl_->receiverQueu
 
 int ConsumerConfiguration::getReceiverQueueSize() const { return impl_->receiverQueueSize; }
 
+void ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions(int size) {
+    impl_->maxTotalReceiverQueueSizeAcrossPartitions = size;
+}
+
+int ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions() const {
+    return impl_->maxTotalReceiverQueueSizeAcrossPartitions;
+}
+
 const std::string& ConsumerConfiguration::getConsumerName() const { return impl_->consumerName; }
 
 void ConsumerConfiguration::setConsumerName(const std::string& consumerName) {
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 3f7ad06c9..1fafc9aa7 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -29,6 +29,7 @@ struct ConsumerConfigurationImpl {
     MessageListener messageListener;
     bool hasMessageListener;
     int receiverQueueSize;
+    int maxTotalReceiverQueueSizeAcrossPartitions;
     std::string consumerName;
     long brokerConsumerStatsCacheTimeInMs;
     ConsumerConfigurationImpl()
@@ -37,7 +38,8 @@ struct ConsumerConfigurationImpl {
           messageListener(),
           hasMessageListener(false),
           brokerConsumerStatsCacheTimeInMs(30 * 1000),  // 30 seconds
-          receiverQueueSize(1000) {}
+          receiverQueueSize(1000),
+          maxTotalReceiverQueueSizeAcrossPartitions(50000) {}
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 52a3c4670..3d22de98f 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -170,6 +170,12 @@ void PartitionedConsumerImpl::start() {
     config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
     config.setMessageListener(
         boost::bind(&PartitionedConsumerImpl::messageReceived, shared_from_this(), _1, _2));
+
+    // Apply total limit of receiver queue size across partitions
+    config.setReceiverQueueSize(
+        std::min(conf_.getReceiverQueueSize(),
+                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions_)));
+
     // create consumer on each partition
     for (unsigned int i = 0; i < numPartitions_; i++) {
         std::string topicPartitionName = destinationName_->getTopicPartitionName(i);
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 6a15056c5..825bcc0ff 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -46,6 +46,11 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client,
     numProducersCreated_ = 0;
     cleanup_ = false;
     routerPolicy_ = getMessageRouter();
+
+    int maxPendingMessagesPerPartition =
+        std::min(config.getMaxPendingMessages(),
+                 (int)(config.getMaxPendingMessagesAcrossPartitions() / numPartitions));
+    conf_.setMaxPendingMessages(maxPendingMessagesPerPartition);
 }
 
 MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index e868b03a5..b06c5926b 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -96,7 +96,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
      */
     bool cleanup_;
 
-    const ProducerConfiguration conf_;
+    ProducerConfiguration conf_;
 
     typedef std::vector<ProducerImplPtr> ProducerList;
 
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 9ae4c6106..011d85994 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -73,6 +73,18 @@ ProducerConfiguration& ProducerConfiguration::setMaxPendingMessages(int maxPendi
 
 int ProducerConfiguration::getMaxPendingMessages() const { return impl_->maxPendingMessages; }
 
+ProducerConfiguration& ProducerConfiguration::setMaxPendingMessagesAcrossPartitions(int maxPendingMessages) {
+    if (maxPendingMessages <= 0) {
+        throw "maxPendingMessages needs to be greater than 0";
+    }
+    impl_->maxPendingMessagesAcrossPartitions = maxPendingMessages;
+    return *this;
+}
+
+int ProducerConfiguration::getMaxPendingMessagesAcrossPartitions() const {
+    return impl_->maxPendingMessagesAcrossPartitions;
+}
+
 ProducerConfiguration& ProducerConfiguration::setPartitionsRoutingMode(const PartitionsRoutingMode& mode) {
     impl_->routingMode = mode;
     return *this;
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index e3e793d8b..f211fb7cd 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -32,6 +32,7 @@ struct ProducerConfigurationImpl {
     int sendTimeoutMs;
     CompressionType compressionType;
     int maxPendingMessages;
+    int maxPendingMessagesAcrossPartitions;
     ProducerConfiguration::PartitionsRoutingMode routingMode;
     MessageRoutingPolicyPtr messageRouter;
     bool blockIfQueueFull;
@@ -43,6 +44,7 @@ struct ProducerConfigurationImpl {
         : sendTimeoutMs(30000),
           compressionType(CompressionNone),
           maxPendingMessages(1000),
+          maxPendingMessagesAcrossPartitions(50000),
           routingMode(ProducerConfiguration::UseSinglePartition),
           blockIfQueueFull(false),
           batchingEnabled(false),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 6d87cfcd4..e0317885e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -48,10 +48,12 @@
 
     private int receiverQueueSize = 1000;
 
+    private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
+
     private String consumerName = null;
 
     private long ackTimeoutMillis = 0;
-    
+
     private int priorityLevel = 0;
 
     private CryptoKeyReader cryptoKeyReader = null;
@@ -134,6 +136,27 @@ public int getReceiverQueueSize() {
         return this.receiverQueueSize;
     }
 
+
+    /**
+     * @return the configured max total receiver queue size across partitions
+     */
+    public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
+        return maxTotalReceiverQueueSizeAcrossPartitions;
+    }
+
+    /**
+     * Set the max total receiver queue size across partitons.
+     * <p>
+     * This setting will be used to reduce the receiver queue size for individual partitions
+     * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+     *
+     * @param maxTotalReceiverQueueSizeAcrossPartitions
+     */
+    public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
+        checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= receiverQueueSize);
+        this.maxTotalReceiverQueueSizeAcrossPartitions = maxTotalReceiverQueueSizeAcrossPartitions;
+    }
+
     /**
      * @return the CryptoKeyReader
      */
@@ -155,7 +178,7 @@ public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader)
 
     /**
      * Sets the ConsumerCryptoFailureAction to the value specified
-     * 
+     *
      * @param The consumer action
      */
     public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
@@ -218,7 +241,7 @@ public ConsumerConfiguration setConsumerName(String consumerName) {
         this.consumerName = consumerName;
         return this;
     }
-    
+
     public int getPriorityLevel() {
         return priorityLevel;
     }
@@ -230,7 +253,7 @@ public int getPriorityLevel() {
      * permits, else broker will consider next priority level consumers. </br>
      * If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch
      * messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
-     * 
+     *
      * <pre>
      * Consumer PriorityLevel Permits
      * C1       0             2
@@ -240,7 +263,7 @@ public int getPriorityLevel() {
      * C5       1             1
      * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
      * </pre>
-     * 
+     *
      * @param priorityLevel
      */
     public void setPriorityLevel(int priorityLevel) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 9accd6136..08b73c296 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -25,11 +25,9 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
-import org.apache.pulsar.client.impl.SinglePartitionMessageRouterImpl;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 
 import com.google.common.base.Objects;
@@ -45,6 +43,7 @@
     private long sendTimeoutMs = 30000;
     private boolean blockIfQueueFull = false;
     private int maxPendingMessages = 1000;
+    private int maxPendingMessagesAcrossPartitions = 50000;
     private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition;
     private MessageRouter customMessageRouter = null;
     private long batchingMaxPublishDelayMs = 10;
@@ -137,6 +136,27 @@ public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) {
         return this;
     }
 
+    /**
+     *
+     * @return the maximum number of pending messages allowed across all the partitions
+     */
+    public int getMaxPendingMessagesAcrossPartitions() {
+        return maxPendingMessagesAcrossPartitions;
+    }
+
+    /**
+     * Set the number of max pending messages across all the partitions
+     * <p>
+     * This setting will be used to lower the max pending messages for each partition
+     * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+     *
+     * @param maxPendingMessagesAcrossPartitions
+     */
+    public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+        checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages);
+        this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+    }
+
     /**
      *
      * @return whether the producer will block {@link Producer#send} and {@link Producer#sendAsync} operations when the
@@ -295,18 +315,18 @@ public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader)
     }
 
     /**
-     * 
+     *
      * @return encryptionKeys
-     *  
+     *
      */
     public  ConcurrentOpenHashSet<String> getEncryptionKeys() {
         return this.encryptionKeys;
     }
 
     /**
-     * 
+     *
      * Returns true if encryption keys are added
-     *  
+     *
      */
     public boolean isEncryptionEnabled() {
         return (this.encryptionKeys != null) && !this.encryptionKeys.isEmpty();
@@ -352,7 +372,7 @@ public ProducerCryptoFailureAction getCryptoFailureAction() {
     }
 
     /**
-     * 
+     *
      * @return the batch time period in ms.
      * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
      */
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 2d7830e2a..1dd3e4b69 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -20,7 +20,9 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.util.*;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
@@ -32,7 +34,6 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
@@ -438,6 +439,11 @@ private ConsumerConfiguration getInternalConsumerConfig() {
         internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
         internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
         internalConsumerConfig.setConsumerName(consumerName);
+
+        int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
+                conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions);
+        internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
+
         if (conf.getCryptoKeyReader() != null) {
             internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
             internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 5c27754f8..04269f36c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -56,6 +56,10 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerCo
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
         stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStats() : null;
+
+        int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
+                conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
+        conf.setMaxPendingMessages(maxPendingMessages);
         start();
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services