You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/31 14:57:25 UTC

[rocketmq-client-go] branch master updated: [ISSUE #876] Safely checking context in trace producer. (#863)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c2f270  [ISSUE #876] Safely checking context in trace producer.  (#863)
6c2f270 is described below

commit 6c2f27086ee14970534136fabc28dc7d1299b3bc
Author: Xuexue <63...@qq.com>
AuthorDate: Sun Jul 31 22:57:20 2022 +0800

    [ISSUE #876] Safely checking context in trace producer.  (#863)
    
    * [feat] /safely producer ctx
    
    * [feat] safely producterCtx
---
 primitive/ctx.go        |  5 +++--
 producer/interceptor.go |  5 ++++-
 producer/producer.go    | 10 ++++++++--
 3 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/primitive/ctx.go b/primitive/ctx.go
index 936b54c..878dbce 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -168,6 +168,7 @@ func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context {
 	return context.WithValue(ctx, producerCtx, c)
 }
 
-func GetProducerCtx(ctx context.Context) *ProducerCtx {
-	return ctx.Value(producerCtx).(*ProducerCtx)
+func GetProducerCtx(ctx context.Context) (*ProducerCtx, bool) {
+	c, exist := ctx.Value(producerCtx).(*ProducerCtx)
+	return c, exist
 }
diff --git a/producer/interceptor.go b/producer/interceptor.go
index 71eb8e7..f77b9c6 100644
--- a/producer/interceptor.go
+++ b/producer/interceptor.go
@@ -52,9 +52,12 @@ func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor
 			return fmt.Errorf("GetOrNewRocketMQClient faild")
 		}
 		beginT := time.Now()
+		producerCtx, ok := primitive.GetProducerCtx(ctx)
+		if !ok {
+			return fmt.Errorf("ProducerCtx Not Exist")
+		}
 		err := next(ctx, req, reply)
 
-		producerCtx := primitive.GetProducerCtx(ctx)
 		if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() {
 			return err
 		}
diff --git a/producer/producer.go b/producer/producer.go
index 9290494..3a71003 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -301,7 +301,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
 		err error
 	)
 
-	var producerCtx *primitive.ProducerCtx
+	var (
+		producerCtx *primitive.ProducerCtx
+		ok          bool
+	)
 	for retryCount := 0; retryCount < retryTime; retryCount++ {
 		mq := p.selectMessageQueue(msg)
 		if mq == nil {
@@ -315,7 +318,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
 		}
 
 		if p.interceptor != nil {
-			producerCtx = primitive.GetProducerCtx(ctx)
+			producerCtx, ok = primitive.GetProducerCtx(ctx)
+			if !ok {
+				return fmt.Errorf("ProducerCtx Not Exist")
+			}
 			producerCtx.BrokerAddr = addr
 			producerCtx.MQ = *mq
 		}