You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/09/24 06:02:09 UTC

[rocketmq-client-go] branch master updated: Add set maxcachesize api to support simple flow-control

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d5c26b  Add set maxcachesize api to support simple flow-control
9d5c26b is described below

commit 9d5c26baae62c1265b43d9914e889516dcf24cc2
Author: ShannonDing <li...@163.com>
AuthorDate: Tue Sep 24 11:51:50 2019 +0800

    Add set maxcachesize api to support simple flow-control
---
 core/api.go           | 18 ++++++++++++++----
 core/push_consumer.go | 14 ++++++++++++++
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/core/api.go b/core/api.go
index f606b0d..152deae 100644
--- a/core/api.go
+++ b/core/api.go
@@ -160,10 +160,12 @@ func (mode ConsumerModel) String() string {
 // PushConsumerConfig define a new consumer.
 type PushConsumerConfig struct {
 	ClientConfig
-	ThreadCount         int
-	MessageBatchMaxSize int
-	Model               MessageModel
-	ConsumerModel       ConsumerModel
+	ThreadCount             int
+	MessageBatchMaxSize     int
+	Model                   MessageModel
+	ConsumerModel           ConsumerModel
+	MaxCacheMessageSize     int
+	MaxCacheMessageSizeInMB int
 }
 
 func (config *PushConsumerConfig) String() string {
@@ -185,6 +187,14 @@ func (config *PushConsumerConfig) String() string {
 	if config.ConsumerModel != 0 {
 		str = strJoin(str, "ConsumerModel", config.ConsumerModel.String())
 	}
+
+	if config.MaxCacheMessageSize != 0 {
+		str = strJoin(str, "MaxCacheMessageSize", config.MaxCacheMessageSize)
+	}
+
+	if config.MaxCacheMessageSizeInMB != 0 {
+		str = strJoin(str, "MaxCacheMessageSizeInMB", config.MaxCacheMessageSizeInMB)
+	}
 	return str + "]"
 }
 
diff --git a/core/push_consumer.go b/core/push_consumer.go
index 29da7fb..017a733 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -167,6 +167,20 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 		}
 	}
 
+	if config.MaxCacheMessageSize > 0 {
+		err = rmqError(C.SetPushConsumerMaxCacheMessageSize(cconsumer, C.int(config.MaxCacheMessageSize)))
+		if err != NIL {
+			return nil, err
+		}
+	}
+
+	if config.MaxCacheMessageSizeInMB > 0 {
+		err = rmqError(C.SetPushConsumerMaxCacheMessageSizeInMb(cconsumer, C.int(config.MaxCacheMessageSizeInMB)))
+		if err != NIL {
+			return nil, err
+		}
+	}
+
 	if config.Model != 0 {
 		var mode C.CMessageModel
 		switch config.Model {