You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by lo...@apache.org on 2022/12/19 07:32:39 UTC

[rocketmq-clients] branch master updated: golang: fast to the ok state (#295)

This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new d821b203 golang: fast to the ok state (#295)
d821b203 is described below

commit d821b203d357f93e0f7fe07521ccb0afcfd1ba29
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Mon Dec 19 15:32:34 2022 +0800

    golang: fast to the ok state (#295)
    
    * golang: fast to the ok state
    
    * fix utest
    
    Co-authored-by: guyinyou <gu...@alibaba-inc.com>
---
 golang/client.go              |  12 ++++-
 golang/client_manager_test.go |  57 +++++++++++++++++++++++
 golang/client_test.go         |  35 ++++++++++++++
 golang/producer_test.go       | 104 ++++++++++--------------------------------
 golang/simple_consumer.go     |   2 +-
 5 files changed, 128 insertions(+), 82 deletions(-)

diff --git a/golang/client.go b/golang/client.go
index c094f1ea..c9d3132f 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -299,6 +299,14 @@ func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteReque
 func (cli *defaultClient) getTotalTargets() []string {
 	endpoints := make([]string, 0)
 	endpointsSet := make(map[string]bool)
+	for _, address := range cli.accessPoint.GetAddresses() {
+		target := utils.ParseAddress(address)
+		if _, ok := endpointsSet[target]; ok {
+			continue
+		}
+		endpointsSet[target] = true
+		endpoints = append(endpoints, target)
+	}
 	cli.router.Range(func(_, v interface{}) bool {
 		messageQueues := v.([]*v2.MessageQueue)
 		for _, messageQueue := range messageQueues {
@@ -372,7 +380,7 @@ func (cli *defaultClient) Heartbeat() {
 }
 
 func (cli *defaultClient) trySyncSettings() {
-	cli.log.Info("start syncSetting")
+	cli.log.Info("start trySyncSettings")
 	command := cli.getSettingsCommand()
 	targets := cli.getTotalTargets()
 	for _, target := range targets {
@@ -381,7 +389,7 @@ func (cli *defaultClient) trySyncSettings() {
 }
 
 func (cli *defaultClient) mustSyncSettings() error {
-	cli.log.Info("start syncSetting")
+	cli.log.Info("start mustSyncSettings")
 	command := cli.getSettingsCommand()
 	targets := cli.getTotalTargets()
 	for _, target := range targets {
diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go
index 2529abc7..ec8bf79d 100644
--- a/golang/client_manager_test.go
+++ b/golang/client_manager_test.go
@@ -20,6 +20,7 @@ package golang
 import (
 	"context"
 	"fmt"
+	"io"
 	"os"
 	"testing"
 	"time"
@@ -27,6 +28,7 @@ import (
 	v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
 	gomock "github.com/golang/mock/gomock"
 	"github.com/prashantv/gostub"
+	"google.golang.org/grpc/metadata"
 )
 
 var MOCK_CLIENT_ID = "mock_client_id"
@@ -35,6 +37,60 @@ var MOCK_GROUP = "mock_group"
 var MOCK_CLIENT *MockClient
 var MOCK_RPC_CLIENT *MockRpcClient
 
+type MOCK_MessagingService_TelemetryClient struct {
+	trace []string
+}
+
+// CloseSend implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) CloseSend() error {
+	mt.trace = append(mt.trace, "closesend")
+	return nil
+}
+
+// Context implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Context() context.Context {
+	mt.trace = append(mt.trace, "context")
+	return nil
+}
+
+// Header implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Header() (metadata.MD, error) {
+	mt.trace = append(mt.trace, "header")
+	return nil, nil
+}
+
+// RecvMsg implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) RecvMsg(m interface{}) error {
+	mt.trace = append(mt.trace, "recvmsg")
+	return nil
+}
+
+// SendMsg implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) SendMsg(m interface{}) error {
+	mt.trace = append(mt.trace, "sendmsg")
+	return nil
+}
+
+// Trailer implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
+	mt.trace = append(mt.trace, "trailer")
+	return nil
+}
+
+// Recv implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
+	mt.trace = append(mt.trace, "recv")
+	return nil, io.EOF
+}
+
+// Send implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Send(*v2.TelemetryCommand) error {
+	mt.trace = append(mt.trace, "send")
+	return nil
+}
+
+var _ = v2.MessagingService_TelemetryClient(&MOCK_MessagingService_TelemetryClient{})
+
 func TestMain(m *testing.M) {
 	os.Setenv("mq.consoleAppender.enabled", "true")
 	ResetLogger()
@@ -50,6 +106,7 @@ func TestMain(m *testing.M) {
 			Code: v2.Code_OK,
 		},
 	}, nil).AnyTimes()
+
 	MOCK_RPC_CLIENT.EXPECT().GracefulStop().Return(nil).AnyTimes()
 	MOCK_RPC_CLIENT.EXPECT().GetTarget().Return(fakeAddresss).AnyTimes()
 	stubs := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
diff --git a/golang/client_test.go b/golang/client_test.go
index 25b43afe..2d339990 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -20,11 +20,46 @@ package golang
 import (
 	"fmt"
 	"testing"
+	"time"
 
 	"github.com/apache/rocketmq-clients/golang/credentials"
+	gomock "github.com/golang/mock/gomock"
+	"github.com/prashantv/gostub"
 )
 
 func TestCLINewClient(t *testing.T) {
+	stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
+		RPC_CLIENT_MAX_IDLE_DURATION: time.Second,
+
+		RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Hour,
+		RPC_CLIENT_IDLE_CHECK_PERIOD:        time.Hour,
+
+		HEART_BEAT_INITIAL_DELAY: time.Hour,
+		HEART_BEAT_PERIOD:        time.Hour,
+
+		LOG_STATS_INITIAL_DELAY: time.Hour,
+		LOG_STATS_PERIOD:        time.Hour,
+
+		SYNC_SETTINGS_DELAY:  time.Hour,
+		SYNC_SETTINGS_PERIOD: time.Hour,
+	})
+
+	stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
+		if target == fakeAddresss {
+			return MOCK_RPC_CLIENT, nil
+		}
+		return nil, fmt.Errorf("invalid target=%s", target)
+	})
+
+	defer func() {
+		stubs.Reset()
+		stubs2.Reset()
+	}()
+
+	MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
+		trace: make([]string, 0),
+	}, nil)
+
 	endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
 	cli, err := NewClient(&Config{
 		Endpoint:    endpoints,
diff --git a/golang/producer_test.go b/golang/producer_test.go
index 1313e696..cc6679ea 100644
--- a/golang/producer_test.go
+++ b/golang/producer_test.go
@@ -20,7 +20,6 @@ package golang
 import (
 	"context"
 	"fmt"
-	"io"
 	"testing"
 	"time"
 
@@ -28,7 +27,6 @@ import (
 	v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
 	gomock "github.com/golang/mock/gomock"
 	"github.com/prashantv/gostub"
-	"google.golang.org/grpc/metadata"
 )
 
 func TestProducer(t *testing.T) {
@@ -47,7 +45,22 @@ func TestProducer(t *testing.T) {
 		SYNC_SETTINGS_DELAY:  time.Hour,
 		SYNC_SETTINGS_PERIOD: time.Hour,
 	})
-	defer stubs.Reset()
+
+	stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
+		if target == fakeAddresss {
+			return MOCK_RPC_CLIENT, nil
+		}
+		return nil, fmt.Errorf("invalid target=%s", target)
+	})
+
+	defer func() {
+		stubs.Reset()
+		stubs2.Reset()
+	}()
+
+	MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
+		trace: make([]string, 0),
+	}, nil).AnyTimes()
 
 	endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
 	p, err := NewProducer(&Config{
@@ -72,7 +85,7 @@ func TestProducer(t *testing.T) {
 				v2.MessageType_TRANSACTION,
 			},
 		}},
-	}, nil)
+	}, nil).AnyTimes()
 	err = p.Start()
 	if err != nil {
 		t.Error(err)
@@ -88,7 +101,7 @@ func TestProducer(t *testing.T) {
 				Code: v2.Code_OK,
 			},
 			Entries: []*v2.SendResultEntry{{}},
-		}, nil)
+		}, nil).AnyTimes()
 
 		_, err := p.Send(context.TODO(), msg)
 		if err != nil {
@@ -101,7 +114,7 @@ func TestProducer(t *testing.T) {
 				Code: v2.Code_OK,
 			},
 			Entries: []*v2.SendResultEntry{{}},
-		}, nil)
+		}, nil).AnyTimes()
 
 		done := make(chan bool)
 		p.SendAsync(context.TODO(), msg, func(ctx context.Context, sr []*SendReceipt, err error) {
@@ -118,12 +131,12 @@ func TestProducer(t *testing.T) {
 				Code: v2.Code_OK,
 			},
 			Entries: []*v2.SendResultEntry{{}},
-		}, nil)
+		}, nil).AnyTimes()
 		MOCK_RPC_CLIENT.EXPECT().EndTransaction(gomock.Any(), gomock.Any()).Return(&v2.EndTransactionResponse{
 			Status: &v2.Status{
 				Code: v2.Code_OK,
 			},
-		}, nil)
+		}, nil).AnyTimes()
 
 		transaction := p.BeginTransaction()
 		_, err := p.SendWithTransaction(context.TODO(), msg, transaction)
@@ -141,12 +154,12 @@ func TestProducer(t *testing.T) {
 				Code: v2.Code_OK,
 			},
 			Entries: []*v2.SendResultEntry{{}},
-		}, nil)
+		}, nil).AnyTimes()
 		MOCK_RPC_CLIENT.EXPECT().EndTransaction(gomock.Any(), gomock.Any()).Return(&v2.EndTransactionResponse{
 			Status: &v2.Status{
 				Code: v2.Code_OK,
 			},
-		}, nil)
+		}, nil).AnyTimes()
 
 		transaction := p.BeginTransaction()
 		_, err := p.SendWithTransaction(context.TODO(), msg, transaction)
@@ -164,7 +177,7 @@ func TestProducer(t *testing.T) {
 				Code: v2.Code_OK,
 			},
 			Entries: []*v2.SendResultEntry{{}},
-		}, nil)
+		}, nil).AnyTimes()
 		msg.SetMessageGroup(MOCK_GROUP)
 		defer func() { msg.messageGroup = nil }()
 		_, err := p.Send(context.TODO(), msg)
@@ -178,7 +191,7 @@ func TestProducer(t *testing.T) {
 				Code: v2.Code_OK,
 			},
 			Entries: []*v2.SendResultEntry{{}},
-		}, nil)
+		}, nil).AnyTimes()
 		msg.SetDelayTimestamp(time.Now().Add(time.Hour))
 		defer func() { msg.deliveryTimestamp = nil }()
 		_, err := p.Send(context.TODO(), msg)
@@ -204,19 +217,6 @@ func TestProducer(t *testing.T) {
 			t.Error(err)
 		}
 	})
-	t.Run("syncsettings", func(t *testing.T) {
-		mt := &MOCK_MessagingService_TelemetryClient{
-			trace: make([]string, 0),
-		}
-		MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(mt, nil)
-		p.(*defaultProducer).cli.clientManager.(*defaultClientManager).syncSettings()
-		for {
-			time.Sleep(time.Duration(100))
-			if len(mt.trace) >= 3 && mt.trace[0] == "send" && mt.trace[1] == "recv" && mt.trace[2] == "closesend" {
-				break
-			}
-		}
-	})
 	t.Run("do heartbeat", func(t *testing.T) {
 		err := p.(*defaultProducer).cli.doHeartbeat(endpoints, nil)
 		if err != nil {
@@ -224,57 +224,3 @@ func TestProducer(t *testing.T) {
 		}
 	})
 }
-
-type MOCK_MessagingService_TelemetryClient struct {
-	trace []string
-}
-
-// CloseSend implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) CloseSend() error {
-	mt.trace = append(mt.trace, "closesend")
-	return nil
-}
-
-// Context implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Context() context.Context {
-	mt.trace = append(mt.trace, "context")
-	return nil
-}
-
-// Header implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Header() (metadata.MD, error) {
-	mt.trace = append(mt.trace, "header")
-	return nil, nil
-}
-
-// RecvMsg implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) RecvMsg(m interface{}) error {
-	mt.trace = append(mt.trace, "recvmsg")
-	return nil
-}
-
-// SendMsg implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) SendMsg(m interface{}) error {
-	mt.trace = append(mt.trace, "sendmsg")
-	return nil
-}
-
-// Trailer implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
-	mt.trace = append(mt.trace, "trailer")
-	return nil
-}
-
-// Recv implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
-	mt.trace = append(mt.trace, "recv")
-	return nil, io.EOF
-}
-
-// Send implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Send(*v2.TelemetryCommand) error {
-	mt.trace = append(mt.trace, "send")
-	return nil
-}
-
-var _ = v2.MessagingService_TelemetryClient(&MOCK_MessagingService_TelemetryClient{})
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index fe8c7734..7b6342f5 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -207,7 +207,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request *v2
 				break
 			}
 			if err != nil {
-				sc.cli.log.Errorf("simpleConsumer recv msg err=%w, requestId=%s", err, utils.GetRequestID(ctx))
+				sc.cli.log.Errorf("simpleConsumer recv msg err=%v, requestId=%s", err, utils.GetRequestID(ctx))
 				break
 			}
 			sugarBaseLogger.Debugf("receiveMessage response: %v", resp)