You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/31 14:57:25 UTC
[rocketmq-client-go] branch master updated: [ISSUE #876] Safely checking context in trace producer. (#863)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6c2f270 [ISSUE #876] Safely checking context in trace producer. (#863)
6c2f270 is described below
commit 6c2f27086ee14970534136fabc28dc7d1299b3bc
Author: Xuexue <63...@qq.com>
AuthorDate: Sun Jul 31 22:57:20 2022 +0800
[ISSUE #876] Safely checking context in trace producer. (#863)
* [feat] /safely producer ctx
* [feat] safely producterCtx
---
primitive/ctx.go | 5 +++--
producer/interceptor.go | 5 ++++-
producer/producer.go | 10 ++++++++--
3 files changed, 15 insertions(+), 5 deletions(-)
diff --git a/primitive/ctx.go b/primitive/ctx.go
index 936b54c..878dbce 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -168,6 +168,7 @@ func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context {
return context.WithValue(ctx, producerCtx, c)
}
-func GetProducerCtx(ctx context.Context) *ProducerCtx {
- return ctx.Value(producerCtx).(*ProducerCtx)
+func GetProducerCtx(ctx context.Context) (*ProducerCtx, bool) {
+ c, exist := ctx.Value(producerCtx).(*ProducerCtx)
+ return c, exist
}
diff --git a/producer/interceptor.go b/producer/interceptor.go
index 71eb8e7..f77b9c6 100644
--- a/producer/interceptor.go
+++ b/producer/interceptor.go
@@ -52,9 +52,12 @@ func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor
return fmt.Errorf("GetOrNewRocketMQClient faild")
}
beginT := time.Now()
+ producerCtx, ok := primitive.GetProducerCtx(ctx)
+ if !ok {
+ return fmt.Errorf("ProducerCtx Not Exist")
+ }
err := next(ctx, req, reply)
- producerCtx := primitive.GetProducerCtx(ctx)
if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() {
return err
}
diff --git a/producer/producer.go b/producer/producer.go
index 9290494..3a71003 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -301,7 +301,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
err error
)
- var producerCtx *primitive.ProducerCtx
+ var (
+ producerCtx *primitive.ProducerCtx
+ ok bool
+ )
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
if mq == nil {
@@ -315,7 +318,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
}
if p.interceptor != nil {
- producerCtx = primitive.GetProducerCtx(ctx)
+ producerCtx, ok = primitive.GetProducerCtx(ctx)
+ if !ok {
+ return fmt.Errorf("ProducerCtx Not Exist")
+ }
producerCtx.BrokerAddr = addr
producerCtx.MQ = *mq
}