You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by lo...@apache.org on 2022/12/19 07:32:39 UTC
[rocketmq-clients] branch master updated: golang: fast to the ok state (#295)
This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new d821b203 golang: fast to the ok state (#295)
d821b203 is described below
commit d821b203d357f93e0f7fe07521ccb0afcfd1ba29
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Mon Dec 19 15:32:34 2022 +0800
golang: fast to the ok state (#295)
* golang: fast to the ok state
* fix utest
Co-authored-by: guyinyou <gu...@alibaba-inc.com>
---
golang/client.go | 12 ++++-
golang/client_manager_test.go | 57 +++++++++++++++++++++++
golang/client_test.go | 35 ++++++++++++++
golang/producer_test.go | 104 ++++++++++--------------------------------
golang/simple_consumer.go | 2 +-
5 files changed, 128 insertions(+), 82 deletions(-)
diff --git a/golang/client.go b/golang/client.go
index c094f1ea..c9d3132f 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -299,6 +299,14 @@ func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteReque
func (cli *defaultClient) getTotalTargets() []string {
endpoints := make([]string, 0)
endpointsSet := make(map[string]bool)
+ for _, address := range cli.accessPoint.GetAddresses() {
+ target := utils.ParseAddress(address)
+ if _, ok := endpointsSet[target]; ok {
+ continue
+ }
+ endpointsSet[target] = true
+ endpoints = append(endpoints, target)
+ }
cli.router.Range(func(_, v interface{}) bool {
messageQueues := v.([]*v2.MessageQueue)
for _, messageQueue := range messageQueues {
@@ -372,7 +380,7 @@ func (cli *defaultClient) Heartbeat() {
}
func (cli *defaultClient) trySyncSettings() {
- cli.log.Info("start syncSetting")
+ cli.log.Info("start trySyncSettings")
command := cli.getSettingsCommand()
targets := cli.getTotalTargets()
for _, target := range targets {
@@ -381,7 +389,7 @@ func (cli *defaultClient) trySyncSettings() {
}
func (cli *defaultClient) mustSyncSettings() error {
- cli.log.Info("start syncSetting")
+ cli.log.Info("start mustSyncSettings")
command := cli.getSettingsCommand()
targets := cli.getTotalTargets()
for _, target := range targets {
diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go
index 2529abc7..ec8bf79d 100644
--- a/golang/client_manager_test.go
+++ b/golang/client_manager_test.go
@@ -20,6 +20,7 @@ package golang
import (
"context"
"fmt"
+ "io"
"os"
"testing"
"time"
@@ -27,6 +28,7 @@ import (
v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
gomock "github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
+ "google.golang.org/grpc/metadata"
)
var MOCK_CLIENT_ID = "mock_client_id"
@@ -35,6 +37,60 @@ var MOCK_GROUP = "mock_group"
var MOCK_CLIENT *MockClient
var MOCK_RPC_CLIENT *MockRpcClient
+type MOCK_MessagingService_TelemetryClient struct {
+ trace []string
+}
+
+// CloseSend implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) CloseSend() error {
+ mt.trace = append(mt.trace, "closesend")
+ return nil
+}
+
+// Context implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Context() context.Context {
+ mt.trace = append(mt.trace, "context")
+ return nil
+}
+
+// Header implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Header() (metadata.MD, error) {
+ mt.trace = append(mt.trace, "header")
+ return nil, nil
+}
+
+// RecvMsg implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) RecvMsg(m interface{}) error {
+ mt.trace = append(mt.trace, "recvmsg")
+ return nil
+}
+
+// SendMsg implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) SendMsg(m interface{}) error {
+ mt.trace = append(mt.trace, "sendmsg")
+ return nil
+}
+
+// Trailer implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
+ mt.trace = append(mt.trace, "trailer")
+ return nil
+}
+
+// Recv implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
+ mt.trace = append(mt.trace, "recv")
+ return nil, io.EOF
+}
+
+// Send implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Send(*v2.TelemetryCommand) error {
+ mt.trace = append(mt.trace, "send")
+ return nil
+}
+
+var _ = v2.MessagingService_TelemetryClient(&MOCK_MessagingService_TelemetryClient{})
+
func TestMain(m *testing.M) {
os.Setenv("mq.consoleAppender.enabled", "true")
ResetLogger()
@@ -50,6 +106,7 @@ func TestMain(m *testing.M) {
Code: v2.Code_OK,
},
}, nil).AnyTimes()
+
MOCK_RPC_CLIENT.EXPECT().GracefulStop().Return(nil).AnyTimes()
MOCK_RPC_CLIENT.EXPECT().GetTarget().Return(fakeAddresss).AnyTimes()
stubs := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
diff --git a/golang/client_test.go b/golang/client_test.go
index 25b43afe..2d339990 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -20,11 +20,46 @@ package golang
import (
"fmt"
"testing"
+ "time"
"github.com/apache/rocketmq-clients/golang/credentials"
+ gomock "github.com/golang/mock/gomock"
+ "github.com/prashantv/gostub"
)
func TestCLINewClient(t *testing.T) {
+ stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
+ RPC_CLIENT_MAX_IDLE_DURATION: time.Second,
+
+ RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Hour,
+ RPC_CLIENT_IDLE_CHECK_PERIOD: time.Hour,
+
+ HEART_BEAT_INITIAL_DELAY: time.Hour,
+ HEART_BEAT_PERIOD: time.Hour,
+
+ LOG_STATS_INITIAL_DELAY: time.Hour,
+ LOG_STATS_PERIOD: time.Hour,
+
+ SYNC_SETTINGS_DELAY: time.Hour,
+ SYNC_SETTINGS_PERIOD: time.Hour,
+ })
+
+ stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
+ if target == fakeAddresss {
+ return MOCK_RPC_CLIENT, nil
+ }
+ return nil, fmt.Errorf("invalid target=%s", target)
+ })
+
+ defer func() {
+ stubs.Reset()
+ stubs2.Reset()
+ }()
+
+ MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
+ trace: make([]string, 0),
+ }, nil)
+
endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
cli, err := NewClient(&Config{
Endpoint: endpoints,
diff --git a/golang/producer_test.go b/golang/producer_test.go
index 1313e696..cc6679ea 100644
--- a/golang/producer_test.go
+++ b/golang/producer_test.go
@@ -20,7 +20,6 @@ package golang
import (
"context"
"fmt"
- "io"
"testing"
"time"
@@ -28,7 +27,6 @@ import (
v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
gomock "github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
- "google.golang.org/grpc/metadata"
)
func TestProducer(t *testing.T) {
@@ -47,7 +45,22 @@ func TestProducer(t *testing.T) {
SYNC_SETTINGS_DELAY: time.Hour,
SYNC_SETTINGS_PERIOD: time.Hour,
})
- defer stubs.Reset()
+
+ stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
+ if target == fakeAddresss {
+ return MOCK_RPC_CLIENT, nil
+ }
+ return nil, fmt.Errorf("invalid target=%s", target)
+ })
+
+ defer func() {
+ stubs.Reset()
+ stubs2.Reset()
+ }()
+
+ MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
+ trace: make([]string, 0),
+ }, nil).AnyTimes()
endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
p, err := NewProducer(&Config{
@@ -72,7 +85,7 @@ func TestProducer(t *testing.T) {
v2.MessageType_TRANSACTION,
},
}},
- }, nil)
+ }, nil).AnyTimes()
err = p.Start()
if err != nil {
t.Error(err)
@@ -88,7 +101,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
- }, nil)
+ }, nil).AnyTimes()
_, err := p.Send(context.TODO(), msg)
if err != nil {
@@ -101,7 +114,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
- }, nil)
+ }, nil).AnyTimes()
done := make(chan bool)
p.SendAsync(context.TODO(), msg, func(ctx context.Context, sr []*SendReceipt, err error) {
@@ -118,12 +131,12 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
- }, nil)
+ }, nil).AnyTimes()
MOCK_RPC_CLIENT.EXPECT().EndTransaction(gomock.Any(), gomock.Any()).Return(&v2.EndTransactionResponse{
Status: &v2.Status{
Code: v2.Code_OK,
},
- }, nil)
+ }, nil).AnyTimes()
transaction := p.BeginTransaction()
_, err := p.SendWithTransaction(context.TODO(), msg, transaction)
@@ -141,12 +154,12 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
- }, nil)
+ }, nil).AnyTimes()
MOCK_RPC_CLIENT.EXPECT().EndTransaction(gomock.Any(), gomock.Any()).Return(&v2.EndTransactionResponse{
Status: &v2.Status{
Code: v2.Code_OK,
},
- }, nil)
+ }, nil).AnyTimes()
transaction := p.BeginTransaction()
_, err := p.SendWithTransaction(context.TODO(), msg, transaction)
@@ -164,7 +177,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
- }, nil)
+ }, nil).AnyTimes()
msg.SetMessageGroup(MOCK_GROUP)
defer func() { msg.messageGroup = nil }()
_, err := p.Send(context.TODO(), msg)
@@ -178,7 +191,7 @@ func TestProducer(t *testing.T) {
Code: v2.Code_OK,
},
Entries: []*v2.SendResultEntry{{}},
- }, nil)
+ }, nil).AnyTimes()
msg.SetDelayTimestamp(time.Now().Add(time.Hour))
defer func() { msg.deliveryTimestamp = nil }()
_, err := p.Send(context.TODO(), msg)
@@ -204,19 +217,6 @@ func TestProducer(t *testing.T) {
t.Error(err)
}
})
- t.Run("syncsettings", func(t *testing.T) {
- mt := &MOCK_MessagingService_TelemetryClient{
- trace: make([]string, 0),
- }
- MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(mt, nil)
- p.(*defaultProducer).cli.clientManager.(*defaultClientManager).syncSettings()
- for {
- time.Sleep(time.Duration(100))
- if len(mt.trace) >= 3 && mt.trace[0] == "send" && mt.trace[1] == "recv" && mt.trace[2] == "closesend" {
- break
- }
- }
- })
t.Run("do heartbeat", func(t *testing.T) {
err := p.(*defaultProducer).cli.doHeartbeat(endpoints, nil)
if err != nil {
@@ -224,57 +224,3 @@ func TestProducer(t *testing.T) {
}
})
}
-
-type MOCK_MessagingService_TelemetryClient struct {
- trace []string
-}
-
-// CloseSend implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) CloseSend() error {
- mt.trace = append(mt.trace, "closesend")
- return nil
-}
-
-// Context implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Context() context.Context {
- mt.trace = append(mt.trace, "context")
- return nil
-}
-
-// Header implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Header() (metadata.MD, error) {
- mt.trace = append(mt.trace, "header")
- return nil, nil
-}
-
-// RecvMsg implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) RecvMsg(m interface{}) error {
- mt.trace = append(mt.trace, "recvmsg")
- return nil
-}
-
-// SendMsg implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) SendMsg(m interface{}) error {
- mt.trace = append(mt.trace, "sendmsg")
- return nil
-}
-
-// Trailer implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
- mt.trace = append(mt.trace, "trailer")
- return nil
-}
-
-// Recv implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
- mt.trace = append(mt.trace, "recv")
- return nil, io.EOF
-}
-
-// Send implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Send(*v2.TelemetryCommand) error {
- mt.trace = append(mt.trace, "send")
- return nil
-}
-
-var _ = v2.MessagingService_TelemetryClient(&MOCK_MessagingService_TelemetryClient{})
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index fe8c7734..7b6342f5 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -207,7 +207,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request *v2
break
}
if err != nil {
- sc.cli.log.Errorf("simpleConsumer recv msg err=%w, requestId=%s", err, utils.GetRequestID(ctx))
+ sc.cli.log.Errorf("simpleConsumer recv msg err=%v, requestId=%s", err, utils.GetRequestID(ctx))
break
}
sugarBaseLogger.Debugf("receiveMessage response: %v", resp)