You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/04/15 02:44:09 UTC

[GitHub] [rocketmq-client-go] githublaohu commented on a diff in pull request #803: Feat/rpc request

githublaohu commented on code in PR #803:
URL: https://github.com/apache/rocketmq-client-go/pull/803#discussion_r851007256


##########
internal/client.go:
##########
@@ -331,6 +331,46 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
 			client.resetOffset(header.topic, header.group, body.OffsetTable)
 			return nil
 		})
+
+		client.remoteClient.RegisterRequestFunc(ReqPushReplyMessageToClient, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+			receiveTime := time.Now().UnixNano() / int64(time.Millisecond)
+			rlog.Info("receive push reply to client request...", map[string]interface{}{
+				rlog.LogKeyBroker:        addr.String(),
+				rlog.LogKeyTopic:         req.ExtFields["topic"],

Review Comment:
   Use static variable proxy string
   
   请使用静态变量代理字符串



##########
examples/consumer/rpc/main.go:
##########
@@ -0,0 +1,89 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/producer"
+)
+
+const (
+	producerGroup = "please_rename_unique_group_name"
+	consumerGroup = "please_rename_unique_group_name"
+	topic         = "RequestTopic"
+)
+
+func main() {

Review Comment:
   Trouble, use the new API to provide a complete RPC response. I feel that the current solution is not elegant enough and users have trouble using it
   
   麻烦,使用新的API提供完整的rpc响应。感觉目前的解决方案不够优雅,用户使用麻烦



##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) []byte {
 	return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl time.Duration) (string, error) {
+	correlationId := uuid.NewV4().String()
+	requestClientId := p.client.ClientID()
+	msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+	msg.WithProperty(primitive.PropertyMessageReplyToClient, requestClientId)
+	msg.WithProperty(primitive.PropertyMessageTTL, strconv.Itoa(int(ttl.Seconds())))
+
+	rlog.Debug("message info:", map[string]interface{}{
+		"clientId":      requestClientId,
+		"correlationId": correlationId,
+		"ttl":           ttl.Seconds(),
+	})
+
+	nameSrv, err := internal.GetNamesrv(requestClientId)
+	if err != nil {
+		return "", errors.Wrap(err, "GetNameServ err")
+	}
+
+	if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {

Review Comment:
   No query is required. Let users configure the ability of broker to automatically create topics. It is specially noted and explained in the function publicity
   
   不需要查询。让用户配置broker自动创建topic的能力。在功能宣讲的时候特地注明,解释



##########
internal/request_response_future.go:
##########
@@ -0,0 +1,135 @@
+package internal
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/patrickmn/go-cache"
+	"github.com/pkg/errors"
+
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+var RequestResponseFutureMap = NewRequestResponseFutureMap()
+
+type requestResponseFutureCache struct {
+	cache *cache.Cache
+}
+
+func NewRequestResponseFutureMap() *requestResponseFutureCache {
+	tmpRrfCache := requestResponseFutureCache{
+		cache: cache.New(5*time.Minute, 10*time.Minute),
+	}
+
+	// OnEvicted delete the timeout RequestResponseFuture, trigger set the failure cause.
+	tmpRrfCache.cache.OnEvicted(func(s string, i interface{}) {
+		rrf, ok := i.(*RequestResponseFuture)
+		if !ok {
+			rlog.Error("convert i to RequestResponseFuture err", map[string]interface{}{
+				"correlationId": s,
+			})
+			return
+		}
+		if !rrf.IsTimeout() {
+			return
+		}
+
+		err := fmt.Errorf("correlationId:%s request timeout, no reply message", s)
+		rrf.CauseErr = err
+		rrf.ExecuteRequestCallback()

Review Comment:
   Considering the asynchronous callback, the synchronization timeout problem is not handled
   
   考虑的异步回调,同步超时问题没有处理



##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) []byte {
 	return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl time.Duration) (string, error) {
+	correlationId := uuid.NewV4().String()
+	requestClientId := p.client.ClientID()
+	msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+	msg.WithProperty(primitive.PropertyMessageReplyToClient, requestClientId)
+	msg.WithProperty(primitive.PropertyMessageTTL, strconv.Itoa(int(ttl.Seconds())))
+
+	rlog.Debug("message info:", map[string]interface{}{
+		"clientId":      requestClientId,
+		"correlationId": correlationId,
+		"ttl":           ttl.Seconds(),
+	})
+
+	nameSrv, err := internal.GetNamesrv(requestClientId)
+	if err != nil {
+		return "", errors.Wrap(err, "GetNameServ err")
+	}
+
+	if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+		// todo
+	}
+
+	return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration, msgs ...*primitive.Message) (*primitive.Message, error) {
+	if err := p.checkMsg(msgs...); err != nil {
+		return nil, err
+	}
+
+	p.messagesWithNamespace(msgs...)
+	msg := p.encodeBatch(msgs...)
+
+	correlationId, err := p.prepareSendRequest(msg, timeout)
+	if err != nil {
+		return nil, err
+	}
+
+	requestResponseFuture := internal.NewRequestResponseFuture(correlationId, timeout, nil)
+	internal.RequestResponseFutureMap.SetRequestResponseFuture(requestResponseFuture)
+	defer internal.RequestResponseFutureMap.RemoveRequestResponseFuture(correlationId)
+
+	f := func(ctx context.Context, result *primitive.SendResult, err error) {
+		if err != nil {
+			requestResponseFuture.SendRequestOk = false
+			requestResponseFuture.ResponseMsg = nil
+			requestResponseFuture.CauseErr = err
+			return
+		}
+		requestResponseFuture.SendRequestOk = true
+	}
+
+	if p.interceptor != nil {
+		primitive.WithMethod(ctx, primitive.SendAsync)
+
+		return nil, p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
+			return p.sendAsync(ctx, msg, f)
+		})
+	}
+	if err := p.sendAsync(ctx, msg, f); err != nil {
+		return nil, errors.Wrap(err, "sendAsync error")
+	}
+
+	return requestResponseFuture.WaitResponseMessage(msg)
+}
+
+// RequestAsync  Async Send messages to consumer
+func (p *defaultProducer) RequestAsync(ctx context.Context, timeout time.Duration, callback internal.RequestCallback, msgs ...*primitive.Message) error {

Review Comment:
   Do not define the return of a method as an exception
   
   不要定义方法的返回为异常



##########
internal/request_response_future.go:
##########
@@ -0,0 +1,135 @@
+package internal
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/patrickmn/go-cache"
+	"github.com/pkg/errors"
+
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+var RequestResponseFutureMap = NewRequestResponseFutureMap()
+
+type requestResponseFutureCache struct {
+	cache *cache.Cache
+}
+
+func NewRequestResponseFutureMap() *requestResponseFutureCache {
+	tmpRrfCache := requestResponseFutureCache{
+		cache: cache.New(5*time.Minute, 10*time.Minute),
+	}
+
+	// OnEvicted delete the timeout RequestResponseFuture, trigger set the failure cause.
+	tmpRrfCache.cache.OnEvicted(func(s string, i interface{}) {
+		rrf, ok := i.(*RequestResponseFuture)
+		if !ok {
+			rlog.Error("convert i to RequestResponseFuture err", map[string]interface{}{
+				"correlationId": s,
+			})
+			return
+		}
+		if !rrf.IsTimeout() {
+			return
+		}
+
+		err := fmt.Errorf("correlationId:%s request timeout, no reply message", s)
+		rrf.CauseErr = err
+		rrf.ExecuteRequestCallback()
+	})
+	return &tmpRrfCache
+}
+
+// SetRequestResponseFuture set rrf to map
+func (fm *requestResponseFutureCache) SetRequestResponseFuture(rrf *RequestResponseFuture) {
+	fm.cache.Set(rrf.CorrelationId, rrf, rrf.Timeout)
+}
+
+// SetResponseToRequestResponseFuture set reply to rrf
+func (fm *requestResponseFutureCache) SetResponseToRequestResponseFuture(correlationId string, reply *primitive.Message) error {
+	rrf, exist := fm.RequestResponseFuture(correlationId)
+	if !exist {
+		return errors.Wrapf(nil, "correlationId:%s not exist in map", correlationId)
+	}
+	rrf.PutResponseMessage(reply)
+	if rrf.RequestCallback != nil {
+		rrf.ExecuteRequestCallback()
+	}
+	return nil
+}
+
+// RequestResponseFuture get rrf from map by the CorrelationId
+func (fm *requestResponseFutureCache) RequestResponseFuture(correlationId string) (*RequestResponseFuture, bool) {
+	res, exists := fm.cache.Get(correlationId)
+	if exists {
+		return res.(*RequestResponseFuture), exists
+	}
+	return nil, exists
+}
+
+// RemoveRequestResponseFuture remove the rrf from map
+func (fm *requestResponseFutureCache) RemoveRequestResponseFuture(correlationId string) {
+	fm.cache.Delete(correlationId)

Review Comment:
   When deleting, please trigger wake-up and execute asynchronous callback operation. At the same time, please pass in the exception
   
   删除的时候,请触发唤醒与执行异步回调操作。同时请传递异常进来



##########
internal/request_response_future.go:
##########
@@ -0,0 +1,135 @@
+package internal
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/patrickmn/go-cache"
+	"github.com/pkg/errors"
+
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+var RequestResponseFutureMap = NewRequestResponseFutureMap()

Review Comment:
   The naming definition of requestresponsefuture in the file is not readable and understandable
   
   文件内RequestResponseFuture的命名定义可读性与可理解性比较差



##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) []byte {
 	return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl time.Duration) (string, error) {
+	correlationId := uuid.NewV4().String()
+	requestClientId := p.client.ClientID()
+	msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+	msg.WithProperty(primitive.PropertyMessageReplyToClient, requestClientId)
+	msg.WithProperty(primitive.PropertyMessageTTL, strconv.Itoa(int(ttl.Seconds())))
+
+	rlog.Debug("message info:", map[string]interface{}{
+		"clientId":      requestClientId,
+		"correlationId": correlationId,
+		"ttl":           ttl.Seconds(),
+	})
+
+	nameSrv, err := internal.GetNamesrv(requestClientId)
+	if err != nil {
+		return "", errors.Wrap(err, "GetNameServ err")
+	}
+
+	if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+		// todo
+	}
+
+	return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration, msgs ...*primitive.Message) (*primitive.Message, error) {
+	if err := p.checkMsg(msgs...); err != nil {

Review Comment:
   The RPC function does not require batch capacity. Just one message at a time. And related codes also need to be corrected
   
   rpc功能不需要批量能力。每次只需要一个消息就行了。以及相关代码也需要修正



##########
internal/request_response_future.go:
##########
@@ -0,0 +1,135 @@
+package internal
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/patrickmn/go-cache"
+	"github.com/pkg/errors"
+
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+var RequestResponseFutureMap = NewRequestResponseFutureMap()
+
+type requestResponseFutureCache struct {
+	cache *cache.Cache
+}
+
+func NewRequestResponseFutureMap() *requestResponseFutureCache {
+	tmpRrfCache := requestResponseFutureCache{
+		cache: cache.New(5*time.Minute, 10*time.Minute),
+	}
+
+	// OnEvicted delete the timeout RequestResponseFuture, trigger set the failure cause.
+	tmpRrfCache.cache.OnEvicted(func(s string, i interface{}) {
+		rrf, ok := i.(*RequestResponseFuture)
+		if !ok {
+			rlog.Error("convert i to RequestResponseFuture err", map[string]interface{}{
+				"correlationId": s,
+			})
+			return
+		}
+		if !rrf.IsTimeout() {
+			return
+		}
+
+		err := fmt.Errorf("correlationId:%s request timeout, no reply message", s)
+		rrf.CauseErr = err
+		rrf.ExecuteRequestCallback()

Review Comment:
   Automatically delete cache when it times out
   If there are exceptions and timeouts, do you want to delete them from the cache
   
   cache 超时时候自动删除
   如果异常与超时,是否要从cache中删除



##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) []byte {
 	return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl time.Duration) (string, error) {
+	correlationId := uuid.NewV4().String()
+	requestClientId := p.client.ClientID()
+	msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+	msg.WithProperty(primitive.PropertyMessageReplyToClient, requestClientId)
+	msg.WithProperty(primitive.PropertyMessageTTL, strconv.Itoa(int(ttl.Seconds())))
+
+	rlog.Debug("message info:", map[string]interface{}{
+		"clientId":      requestClientId,
+		"correlationId": correlationId,
+		"ttl":           ttl.Seconds(),
+	})
+
+	nameSrv, err := internal.GetNamesrv(requestClientId)
+	if err != nil {
+		return "", errors.Wrap(err, "GetNameServ err")
+	}
+
+	if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+		// todo
+	}
+
+	return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration, msgs ...*primitive.Message) (*primitive.Message, error) {
+	if err := p.checkMsg(msgs...); err != nil {
+		return nil, err
+	}
+
+	p.messagesWithNamespace(msgs...)
+	msg := p.encodeBatch(msgs...)
+
+	correlationId, err := p.prepareSendRequest(msg, timeout)
+	if err != nil {
+		return nil, err
+	}
+
+	requestResponseFuture := internal.NewRequestResponseFuture(correlationId, timeout, nil)
+	internal.RequestResponseFutureMap.SetRequestResponseFuture(requestResponseFuture)
+	defer internal.RequestResponseFutureMap.RemoveRequestResponseFuture(correlationId)
+
+	f := func(ctx context.Context, result *primitive.SendResult, err error) {
+		if err != nil {
+			requestResponseFuture.SendRequestOk = false
+			requestResponseFuture.ResponseMsg = nil
+			requestResponseFuture.CauseErr = err
+			return
+		}
+		requestResponseFuture.SendRequestOk = true
+	}
+
+	if p.interceptor != nil {
+		primitive.WithMethod(ctx, primitive.SendAsync)
+
+		return nil, p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
+			return p.sendAsync(ctx, msg, f)
+		})
+	}
+	if err := p.sendAsync(ctx, msg, f); err != nil {
+		return nil, errors.Wrap(err, "sendAsync error")
+	}
+
+	return requestResponseFuture.WaitResponseMessage(msg)
+}
+
+// RequestAsync  Async Send messages to consumer
+func (p *defaultProducer) RequestAsync(ctx context.Context, timeout time.Duration, callback internal.RequestCallback, msgs ...*primitive.Message) error {
+	if err := p.checkMsg(msgs...); err != nil {
+		return err
+	}
+
+	p.messagesWithNamespace(msgs...)
+	msg := p.encodeBatch(msgs...)
+
+	correlationId, err := p.prepareSendRequest(msg, timeout)
+	if err != nil {
+		return err

Review Comment:
   Please call the asynchronous return method
   
   请调用异步回到方法



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org