You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/25 06:29:23 UTC

[rocketmq-client-go] branch master updated: [ISSUE #728] client shutdown abnormally (#729)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ce8e66  [ISSUE #728] client shutdown abnormally (#729)
9ce8e66 is described below

commit 9ce8e66db018e35fd46ea8a08180ff9df456af69
Author: zhangyang <Gi...@163.com>
AuthorDate: Mon Jul 25 14:29:18 2022 +0800

    [ISSUE #728] client shutdown abnormally (#729)
    
    * [ISSUE #757] Fix GetHeader type conflict
    
    Signed-off-by: zhangyang <Gi...@163.com>
    
    * [ISSUE #728] Optimize client instance management
    
    Signed-off-by: zhangyang21 <zh...@xiaomi.com>
    
    * remove dupilcate "LogKeyProducerGroup" in rlog.
    
    Co-authored-by: dinglei <li...@163.com>
---
 errors/errors.go     |  1 +
 internal/client.go   | 18 +++++++++++++++---
 producer/producer.go | 35 ++++++++++++++++++++++-------------
 3 files changed, 38 insertions(+), 16 deletions(-)

diff --git a/errors/errors.go b/errors/errors.go
index 195984e..2899506 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -47,5 +47,6 @@ var (
 	ErrMessageEmpty      = errors.New("message is nil")
 	ErrNotRunning        = errors.New("producer not started")
 	ErrPullConsumer      = errors.New("pull consumer has not supported")
+	ErrProducerCreated   = errors.New("producer group has been created")
 	ErrMultipleTopics    = errors.New("the topic of the messages in one batch should be the same")
 )
diff --git a/internal/client.go b/internal/client.go
index 538c735..bb7538c 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -27,6 +27,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
@@ -180,6 +181,8 @@ type rmqClient struct {
 	rbMutex      sync.Mutex
 	done         chan struct{}
 	shutdownOnce sync.Once
+
+	instanceCount int32
 }
 
 func (c *rmqClient) GetNameSrv() Namesrvs {
@@ -381,6 +384,7 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
 func (c *rmqClient) Start() {
 	//ctx, cancel := context.WithCancel(context.Background())
 	//c.cancel = cancel
+	atomic.AddInt32(&c.instanceCount, 1)
 	c.once.Do(func() {
 		if !c.option.Credentials.IsEmpty() {
 			c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
@@ -513,6 +517,10 @@ func (c *rmqClient) removeClient() {
 }
 
 func (c *rmqClient) Shutdown() {
+	if atomic.AddInt32(&c.instanceCount, -1) > 0 {
+		return
+	}
+
 	c.shutdownOnce.Do(func() {
 		close(c.done)
 		c.close = true
@@ -800,10 +808,14 @@ func (c *rmqClient) UnregisterConsumer(group string) {
 }
 
 func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) error {
-	_, loaded := c.producerMap.LoadOrStore(group, producer)
-	if loaded {
-		return fmt.Errorf("the producer group \"%s\" has been created, specify another one", c.option.GroupName)
+	_, exist := c.producerMap.Load(group)
+	if exist {
+		rlog.Warning("the producer group exist already", map[string]interface{}{
+			rlog.LogKeyProducerGroup: group,
+		})
+		return fmt.Errorf("the producer group exist already")
 	}
+	c.producerMap.Store(group, producer)
 	return nil
 }
 
diff --git a/producer/producer.go b/producer/producer.go
index 3ead53f..9290494 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -46,6 +46,9 @@ type defaultProducer struct {
 	callbackCh  chan interface{}
 
 	interceptor primitive.Interceptor
+
+	startOnce    sync.Once
+	ShutdownOnce sync.Once
 }
 
 func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
@@ -79,22 +82,28 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
 }
 
 func (p *defaultProducer) Start() error {
-	if p == nil || p.client == nil {
-		return fmt.Errorf("client instance is nil, can not start producer")
-	}
-	atomic.StoreInt32(&p.state, int32(internal.StateRunning))
-	err := p.client.RegisterProducer(p.group, p)
-	if err != nil {
-		return err
-	}
-	p.client.Start()
-	return nil
+	var err error
+	p.startOnce.Do(func() {
+		err = p.client.RegisterProducer(p.group, p)
+		if err != nil {
+			rlog.Error("the producer group has been created, specify another one", map[string]interface{}{
+				rlog.LogKeyProducerGroup: p.group,
+			})
+			err = errors2.ErrProducerCreated
+			return
+		}
+		p.client.Start()
+		atomic.StoreInt32(&p.state, int32(internal.StateRunning))
+	})
+	return err
 }
 
 func (p *defaultProducer) Shutdown() error {
-	atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
-	p.client.UnregisterProducer(p.group)
-	p.client.Shutdown()
+	p.ShutdownOnce.Do(func() {
+		atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
+		p.client.UnregisterProducer(p.group)
+		p.client.Shutdown()
+	})
 	return nil
 }