You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/01/31 05:19:05 UTC
[incubator-pulsar] branch master updated: Configure a max
producer/consumer queue size enforced across partitions (#1123)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e783a58 Configure a max producer/consumer queue size enforced across partitions (#1123)
e783a58 is described below
commit e783a58b974253603f5bee72eafc04f08732f660
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 30 21:19:02 2018 -0800
Configure a max producer/consumer queue size enforced across partitions (#1123)
---
.../include/pulsar/ConsumerConfiguration.h | 15 +++++++++
.../include/pulsar/ProducerConfiguration.h | 16 ++++++++++
pulsar-client-cpp/lib/ConsumerConfiguration.cc | 8 +++++
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 4 ++-
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 6 ++++
pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 5 +++
pulsar-client-cpp/lib/PartitionedProducerImpl.h | 2 +-
pulsar-client-cpp/lib/ProducerConfiguration.cc | 12 ++++++++
pulsar-client-cpp/lib/ProducerConfigurationImpl.h | 2 ++
.../pulsar/client/api/ConsumerConfiguration.java | 33 +++++++++++++++++---
.../pulsar/client/api/ProducerConfiguration.java | 36 +++++++++++++++++-----
.../client/impl/PartitionedConsumerImpl.java | 10 ++++--
.../client/impl/PartitionedProducerImpl.java | 4 +++
13 files changed, 136 insertions(+), 17 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 4ffe6df..2f72eb8 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -97,6 +97,21 @@ class ConsumerConfiguration {
void setReceiverQueueSize(int size);
int getReceiverQueueSize() const;
+ /**
+ * Set the max total receiver queue size across partitons.
+ * <p>
+ * This setting will be used to reduce the receiver queue size for individual partitions
+ * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+ *
+ * @param maxTotalReceiverQueueSizeAcrossPartitions
+ */
+ void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
+
+ /**
+ * @return the configured max total receiver queue size across partitions
+ */
+ int getMaxTotalReceiverQueueSizeAcrossPartitions() const;
+
void setConsumerName(const std::string&);
const std::string& getConsumerName() const;
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 2c745a3..9d2d5f9 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -64,6 +64,22 @@ class ProducerConfiguration {
ProducerConfiguration& setMaxPendingMessages(int maxPendingMessages);
int getMaxPendingMessages() const;
+ /**
+ * Set the number of max pending messages across all the partitions
+ * <p>
+ * This setting will be used to lower the max pending messages for each partition
+ * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+ *
+ * @param maxPendingMessagesAcrossPartitions
+ */
+ ProducerConfiguration& setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
+
+ /**
+ *
+ * @return the maximum number of pending messages allowed across all the partitions
+ */
+ int getMaxPendingMessagesAcrossPartitions() const;
+
ProducerConfiguration& setPartitionsRoutingMode(const PartitionsRoutingMode& mode);
PartitionsRoutingMode getPartitionsRoutingMode() const;
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index be19a44..17d2c4b 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -60,6 +60,14 @@ void ConsumerConfiguration::setReceiverQueueSize(int size) { impl_->receiverQueu
int ConsumerConfiguration::getReceiverQueueSize() const { return impl_->receiverQueueSize; }
+void ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions(int size) {
+ impl_->maxTotalReceiverQueueSizeAcrossPartitions = size;
+}
+
+int ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions() const {
+ return impl_->maxTotalReceiverQueueSizeAcrossPartitions;
+}
+
const std::string& ConsumerConfiguration::getConsumerName() const { return impl_->consumerName; }
void ConsumerConfiguration::setConsumerName(const std::string& consumerName) {
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 3f7ad06..1fafc9a 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -29,6 +29,7 @@ struct ConsumerConfigurationImpl {
MessageListener messageListener;
bool hasMessageListener;
int receiverQueueSize;
+ int maxTotalReceiverQueueSizeAcrossPartitions;
std::string consumerName;
long brokerConsumerStatsCacheTimeInMs;
ConsumerConfigurationImpl()
@@ -37,7 +38,8 @@ struct ConsumerConfigurationImpl {
messageListener(),
hasMessageListener(false),
brokerConsumerStatsCacheTimeInMs(30 * 1000), // 30 seconds
- receiverQueueSize(1000) {}
+ receiverQueueSize(1000),
+ maxTotalReceiverQueueSizeAcrossPartitions(50000) {}
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 52a3c46..3d22de9 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -170,6 +170,12 @@ void PartitionedConsumerImpl::start() {
config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
config.setMessageListener(
boost::bind(&PartitionedConsumerImpl::messageReceived, shared_from_this(), _1, _2));
+
+ // Apply total limit of receiver queue size across partitions
+ config.setReceiverQueueSize(
+ std::min(conf_.getReceiverQueueSize(),
+ (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions_)));
+
// create consumer on each partition
for (unsigned int i = 0; i < numPartitions_; i++) {
std::string topicPartitionName = destinationName_->getTopicPartitionName(i);
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 6a15056..825bcc0 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -46,6 +46,11 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client,
numProducersCreated_ = 0;
cleanup_ = false;
routerPolicy_ = getMessageRouter();
+
+ int maxPendingMessagesPerPartition =
+ std::min(config.getMaxPendingMessages(),
+ (int)(config.getMaxPendingMessagesAcrossPartitions() / numPartitions));
+ conf_.setMaxPendingMessages(maxPendingMessagesPerPartition);
}
MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index e868b03..b06c592 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -96,7 +96,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
*/
bool cleanup_;
- const ProducerConfiguration conf_;
+ ProducerConfiguration conf_;
typedef std::vector<ProducerImplPtr> ProducerList;
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 9ae4c61..011d859 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -73,6 +73,18 @@ ProducerConfiguration& ProducerConfiguration::setMaxPendingMessages(int maxPendi
int ProducerConfiguration::getMaxPendingMessages() const { return impl_->maxPendingMessages; }
+ProducerConfiguration& ProducerConfiguration::setMaxPendingMessagesAcrossPartitions(int maxPendingMessages) {
+ if (maxPendingMessages <= 0) {
+ throw "maxPendingMessages needs to be greater than 0";
+ }
+ impl_->maxPendingMessagesAcrossPartitions = maxPendingMessages;
+ return *this;
+}
+
+int ProducerConfiguration::getMaxPendingMessagesAcrossPartitions() const {
+ return impl_->maxPendingMessagesAcrossPartitions;
+}
+
ProducerConfiguration& ProducerConfiguration::setPartitionsRoutingMode(const PartitionsRoutingMode& mode) {
impl_->routingMode = mode;
return *this;
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index e3e793d..f211fb7 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -32,6 +32,7 @@ struct ProducerConfigurationImpl {
int sendTimeoutMs;
CompressionType compressionType;
int maxPendingMessages;
+ int maxPendingMessagesAcrossPartitions;
ProducerConfiguration::PartitionsRoutingMode routingMode;
MessageRoutingPolicyPtr messageRouter;
bool blockIfQueueFull;
@@ -43,6 +44,7 @@ struct ProducerConfigurationImpl {
: sendTimeoutMs(30000),
compressionType(CompressionNone),
maxPendingMessages(1000),
+ maxPendingMessagesAcrossPartitions(50000),
routingMode(ProducerConfiguration::UseSinglePartition),
blockIfQueueFull(false),
batchingEnabled(false),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 6d87cfc..e031788 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -48,10 +48,12 @@ public class ConsumerConfiguration implements Serializable {
private int receiverQueueSize = 1000;
+ private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
+
private String consumerName = null;
private long ackTimeoutMillis = 0;
-
+
private int priorityLevel = 0;
private CryptoKeyReader cryptoKeyReader = null;
@@ -134,6 +136,27 @@ public class ConsumerConfiguration implements Serializable {
return this.receiverQueueSize;
}
+
+ /**
+ * @return the configured max total receiver queue size across partitions
+ */
+ public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
+ return maxTotalReceiverQueueSizeAcrossPartitions;
+ }
+
+ /**
+ * Set the max total receiver queue size across partitons.
+ * <p>
+ * This setting will be used to reduce the receiver queue size for individual partitions
+ * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+ *
+ * @param maxTotalReceiverQueueSizeAcrossPartitions
+ */
+ public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
+ checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= receiverQueueSize);
+ this.maxTotalReceiverQueueSizeAcrossPartitions = maxTotalReceiverQueueSizeAcrossPartitions;
+ }
+
/**
* @return the CryptoKeyReader
*/
@@ -155,7 +178,7 @@ public class ConsumerConfiguration implements Serializable {
/**
* Sets the ConsumerCryptoFailureAction to the value specified
- *
+ *
* @param The consumer action
*/
public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
@@ -218,7 +241,7 @@ public class ConsumerConfiguration implements Serializable {
this.consumerName = consumerName;
return this;
}
-
+
public int getPriorityLevel() {
return priorityLevel;
}
@@ -230,7 +253,7 @@ public class ConsumerConfiguration implements Serializable {
* permits, else broker will consider next priority level consumers. </br>
* If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch
* messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
- *
+ *
* <pre>
* Consumer PriorityLevel Permits
* C1 0 2
@@ -240,7 +263,7 @@ public class ConsumerConfiguration implements Serializable {
* C5 1 1
* Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
* </pre>
- *
+ *
* @param priorityLevel
*/
public void setPriorityLevel(int priorityLevel) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 9accd61..08b73c2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -25,11 +25,9 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
-import org.apache.pulsar.client.impl.SinglePartitionMessageRouterImpl;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import com.google.common.base.Objects;
@@ -45,6 +43,7 @@ public class ProducerConfiguration implements Serializable {
private long sendTimeoutMs = 30000;
private boolean blockIfQueueFull = false;
private int maxPendingMessages = 1000;
+ private int maxPendingMessagesAcrossPartitions = 50000;
private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition;
private MessageRouter customMessageRouter = null;
private long batchingMaxPublishDelayMs = 10;
@@ -139,6 +138,27 @@ public class ProducerConfiguration implements Serializable {
/**
*
+ * @return the maximum number of pending messages allowed across all the partitions
+ */
+ public int getMaxPendingMessagesAcrossPartitions() {
+ return maxPendingMessagesAcrossPartitions;
+ }
+
+ /**
+ * Set the number of max pending messages across all the partitions
+ * <p>
+ * This setting will be used to lower the max pending messages for each partition
+ * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+ *
+ * @param maxPendingMessagesAcrossPartitions
+ */
+ public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+ checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages);
+ this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+ }
+
+ /**
+ *
* @return whether the producer will block {@link Producer#send} and {@link Producer#sendAsync} operations when the
* pending queue is full
*/
@@ -295,18 +315,18 @@ public class ProducerConfiguration implements Serializable {
}
/**
- *
+ *
* @return encryptionKeys
- *
+ *
*/
public ConcurrentOpenHashSet<String> getEncryptionKeys() {
return this.encryptionKeys;
}
/**
- *
+ *
* Returns true if encryption keys are added
- *
+ *
*/
public boolean isEncryptionEnabled() {
return (this.encryptionKeys != null) && !this.encryptionKeys.isEmpty();
@@ -352,7 +372,7 @@ public class ProducerConfiguration implements Serializable {
}
/**
- *
+ *
* @return the batch time period in ms.
* @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
*/
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 2d7830e..1dd3e4b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
-import java.util.*;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
@@ -32,7 +34,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
-import com.google.common.collect.ImmutableMap;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
@@ -438,6 +439,11 @@ public class PartitionedConsumerImpl extends ConsumerBase {
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
internalConsumerConfig.setConsumerName(consumerName);
+
+ int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
+ conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions);
+ internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
+
if (conf.getCryptoKeyReader() != null) {
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 5c27754..04269f3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -56,6 +56,10 @@ public class PartitionedProducerImpl extends ProducerBase {
this.topicMetadata = new TopicMetadataImpl(numPartitions);
this.routerPolicy = getMessageRouter();
stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStats() : null;
+
+ int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
+ conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
+ conf.setMaxPendingMessages(maxPendingMessages);
start();
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.