You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/25 06:29:23 UTC
[rocketmq-client-go] branch master updated: [ISSUE #728] client shutdown abnormally (#729)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 9ce8e66 [ISSUE #728] client shutdown abnormally (#729)
9ce8e66 is described below
commit 9ce8e66db018e35fd46ea8a08180ff9df456af69
Author: zhangyang <Gi...@163.com>
AuthorDate: Mon Jul 25 14:29:18 2022 +0800
[ISSUE #728] client shutdown abnormally (#729)
* [ISSUE #757] Fix GetHeader type conflict
Signed-off-by: zhangyang <Gi...@163.com>
* [ISSUE #728] Optimize client instance management
Signed-off-by: zhangyang21 <zh...@xiaomi.com>
* remove dupilcate "LogKeyProducerGroup" in rlog.
Co-authored-by: dinglei <li...@163.com>
---
errors/errors.go | 1 +
internal/client.go | 18 +++++++++++++++---
producer/producer.go | 35 ++++++++++++++++++++++-------------
3 files changed, 38 insertions(+), 16 deletions(-)
diff --git a/errors/errors.go b/errors/errors.go
index 195984e..2899506 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -47,5 +47,6 @@ var (
ErrMessageEmpty = errors.New("message is nil")
ErrNotRunning = errors.New("producer not started")
ErrPullConsumer = errors.New("pull consumer has not supported")
+ ErrProducerCreated = errors.New("producer group has been created")
ErrMultipleTopics = errors.New("the topic of the messages in one batch should be the same")
)
diff --git a/internal/client.go b/internal/client.go
index 538c735..bb7538c 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -27,6 +27,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
@@ -180,6 +181,8 @@ type rmqClient struct {
rbMutex sync.Mutex
done chan struct{}
shutdownOnce sync.Once
+
+ instanceCount int32
}
func (c *rmqClient) GetNameSrv() Namesrvs {
@@ -381,6 +384,7 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
func (c *rmqClient) Start() {
//ctx, cancel := context.WithCancel(context.Background())
//c.cancel = cancel
+ atomic.AddInt32(&c.instanceCount, 1)
c.once.Do(func() {
if !c.option.Credentials.IsEmpty() {
c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
@@ -513,6 +517,10 @@ func (c *rmqClient) removeClient() {
}
func (c *rmqClient) Shutdown() {
+ if atomic.AddInt32(&c.instanceCount, -1) > 0 {
+ return
+ }
+
c.shutdownOnce.Do(func() {
close(c.done)
c.close = true
@@ -800,10 +808,14 @@ func (c *rmqClient) UnregisterConsumer(group string) {
}
func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) error {
- _, loaded := c.producerMap.LoadOrStore(group, producer)
- if loaded {
- return fmt.Errorf("the producer group \"%s\" has been created, specify another one", c.option.GroupName)
+ _, exist := c.producerMap.Load(group)
+ if exist {
+ rlog.Warning("the producer group exist already", map[string]interface{}{
+ rlog.LogKeyProducerGroup: group,
+ })
+ return fmt.Errorf("the producer group exist already")
}
+ c.producerMap.Store(group, producer)
return nil
}
diff --git a/producer/producer.go b/producer/producer.go
index 3ead53f..9290494 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -46,6 +46,9 @@ type defaultProducer struct {
callbackCh chan interface{}
interceptor primitive.Interceptor
+
+ startOnce sync.Once
+ ShutdownOnce sync.Once
}
func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
@@ -79,22 +82,28 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
}
func (p *defaultProducer) Start() error {
- if p == nil || p.client == nil {
- return fmt.Errorf("client instance is nil, can not start producer")
- }
- atomic.StoreInt32(&p.state, int32(internal.StateRunning))
- err := p.client.RegisterProducer(p.group, p)
- if err != nil {
- return err
- }
- p.client.Start()
- return nil
+ var err error
+ p.startOnce.Do(func() {
+ err = p.client.RegisterProducer(p.group, p)
+ if err != nil {
+ rlog.Error("the producer group has been created, specify another one", map[string]interface{}{
+ rlog.LogKeyProducerGroup: p.group,
+ })
+ err = errors2.ErrProducerCreated
+ return
+ }
+ p.client.Start()
+ atomic.StoreInt32(&p.state, int32(internal.StateRunning))
+ })
+ return err
}
func (p *defaultProducer) Shutdown() error {
- atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
- p.client.UnregisterProducer(p.group)
- p.client.Shutdown()
+ p.ShutdownOnce.Do(func() {
+ atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
+ p.client.UnregisterProducer(p.group)
+ p.client.Shutdown()
+ })
return nil
}