You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/09/24 06:02:09 UTC
[rocketmq-client-go] branch master updated: Add set maxcachesize
api to support simple flow-control
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 9d5c26b Add set maxcachesize api to support simple flow-control
9d5c26b is described below
commit 9d5c26baae62c1265b43d9914e889516dcf24cc2
Author: ShannonDing <li...@163.com>
AuthorDate: Tue Sep 24 11:51:50 2019 +0800
Add set maxcachesize api to support simple flow-control
---
core/api.go | 18 ++++++++++++++----
core/push_consumer.go | 14 ++++++++++++++
2 files changed, 28 insertions(+), 4 deletions(-)
diff --git a/core/api.go b/core/api.go
index f606b0d..152deae 100644
--- a/core/api.go
+++ b/core/api.go
@@ -160,10 +160,12 @@ func (mode ConsumerModel) String() string {
// PushConsumerConfig define a new consumer.
type PushConsumerConfig struct {
ClientConfig
- ThreadCount int
- MessageBatchMaxSize int
- Model MessageModel
- ConsumerModel ConsumerModel
+ ThreadCount int
+ MessageBatchMaxSize int
+ Model MessageModel
+ ConsumerModel ConsumerModel
+ MaxCacheMessageSize int
+ MaxCacheMessageSizeInMB int
}
func (config *PushConsumerConfig) String() string {
@@ -185,6 +187,14 @@ func (config *PushConsumerConfig) String() string {
if config.ConsumerModel != 0 {
str = strJoin(str, "ConsumerModel", config.ConsumerModel.String())
}
+
+ if config.MaxCacheMessageSize != 0 {
+ str = strJoin(str, "MaxCacheMessageSize", config.MaxCacheMessageSize)
+ }
+
+ if config.MaxCacheMessageSizeInMB != 0 {
+ str = strJoin(str, "MaxCacheMessageSizeInMB", config.MaxCacheMessageSizeInMB)
+ }
return str + "]"
}
diff --git a/core/push_consumer.go b/core/push_consumer.go
index 29da7fb..017a733 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -167,6 +167,20 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
}
}
+ if config.MaxCacheMessageSize > 0 {
+ err = rmqError(C.SetPushConsumerMaxCacheMessageSize(cconsumer, C.int(config.MaxCacheMessageSize)))
+ if err != NIL {
+ return nil, err
+ }
+ }
+
+ if config.MaxCacheMessageSizeInMB > 0 {
+ err = rmqError(C.SetPushConsumerMaxCacheMessageSizeInMb(cconsumer, C.int(config.MaxCacheMessageSizeInMB)))
+ if err != NIL {
+ return nil, err
+ }
+ }
+
if config.Model != 0 {
var mode C.CMessageModel
switch config.Model {