You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/20 05:43:55 UTC
[rocketmq-clients] branch master updated: try to recover on server TelemetryCommand transmission error (#408)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 550f9699 try to recover on server TelemetryCommand transmission error (#408)
550f9699 is described below
commit 550f9699f17f340fb7684b41bae45e1b57c324de
Author: Paweł Biegun <69...@users.noreply.github.com>
AuthorDate: Mon Mar 20 06:43:47 2023 +0100
try to recover on server TelemetryCommand transmission error (#408)
* try to recover on server TelemetryCommand transmission error
* refactor modyfying the recovery state
* write tests for auxilary functions
* remove binary files
* add .test files to gitignore
* refactor tests
* write the rest of tests
* move Go .gitignore to global .gitignore
---
.gitignore | 3 +
golang/client.go | 109 +++++++++++++++++----
golang/client_manager.go | 1 -
golang/client_manager_test.go | 17 +++-
golang/client_test.go | 215 ++++++++++++++++++++++++++++++++++++++++++
golang/rpc_client_mock.go | 4 +-
6 files changed, 323 insertions(+), 26 deletions(-)
diff --git a/.gitignore b/.gitignore
index 924f09fb..4959de17 100644
--- a/.gitignore
+++ b/.gitignore
@@ -41,3 +41,6 @@ rust/src/pb/*.rs
composer.phar
composer.lock
vendor/
+
+# Go
+*.tests
diff --git a/golang/client.go b/golang/client.go
index 2b95f2bc..313092ee 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -51,11 +51,13 @@ type isClient interface {
onVerifyMessageCommand(endpoints *v2.Endpoints, command *v2.VerifyMessageCommand) error
}
type defaultClientSession struct {
- endpoints *v2.Endpoints
- observer v2.MessagingService_TelemetryClient
- observerLock sync.RWMutex
- cli *defaultClient
- timeout time.Duration
+ endpoints *v2.Endpoints
+ observer v2.MessagingService_TelemetryClient
+ observerLock sync.RWMutex
+ cli *defaultClient
+ timeout time.Duration
+ recovering bool
+ recoveryWaitTime time.Duration `default:"5s"`
}
func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientSession, error) {
@@ -64,36 +66,79 @@ func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientS
return nil, err
}
cs := &defaultClientSession{
- endpoints: endpoints,
- cli: cli,
- timeout: 365 * 24 * time.Hour,
+ endpoints: endpoints,
+ cli: cli,
+ timeout: 365 * 24 * time.Hour,
+ recovering: false,
}
cs.startUp()
return cs, nil
}
+
+func (cs *defaultClientSession) _acquire_observer() (v2.MessagingService_TelemetryClient, bool) {
+ cs.observerLock.RLock()
+ observer := cs.observer
+ cs.observerLock.RUnlock()
+
+ if observer == nil {
+ time.Sleep(time.Second)
+ return nil, false
+ } else {
+ return observer, true
+ }
+
+}
+
+func (cs *defaultClientSession) _execute_server_telemetry_command(command *v2.TelemetryCommand) {
+ err := cs.handleTelemetryCommand(command)
+ if err != nil {
+ cs.cli.log.Errorf("telemetryCommand recv err=%w", err)
+ } else {
+ cs.cli.log.Info("Executed command successfully")
+ }
+}
+
func (cs *defaultClientSession) startUp() {
cs.cli.log.Infof("defaultClientSession is startUp! endpoints=%v", cs.endpoints)
go func() {
for {
- cs.observerLock.RLock()
- observer := cs.observer
- cs.observerLock.RUnlock()
-
- if observer == nil {
- time.Sleep(time.Second)
+ // ensure that observer is present, if not wait for it to be regenerated on publish.
+ observer, acquired_observer := cs._acquire_observer()
+ if !acquired_observer {
continue
}
+
response, err := observer.Recv()
if err != nil {
- cs.release()
-
- cs.cli.log.Errorf("telemetryCommand recv err=%w", err)
+ // we are recovering
+ if !cs.recovering {
+ cs.cli.log.Info("Encountered error while receiving TelemetryCommand, trying to recover")
+ // we wait five seconds to give time for the transmission error to be resolved externally before we attempt to read the message again.
+ time.Sleep(cs.recoveryWaitTime)
+ cs.recovering = true
+ } else {
+ // we are recovering but we failed to read the message again, resetting observer
+ cs.cli.log.Info("Failed to recover, err=%w", err)
+ cs.release()
+ cs.recovering = false
+ }
continue
}
- err = cs.handleTelemetryCommand(response)
- if err != nil {
- cs.cli.log.Errorf("telemetryCommand recv err=%w", err)
+ // at this point we received the message and must confirm that the sender is healthy
+ if cs.recovering {
+ // we don't know which server sent the request so we must check that each of the servers is healthy.
+ // we assume that the list of the servers hasn't changed, so the server that sent the message is still present.
+ hearbeat_response, err := cs.cli.clientManager.HeartBeat(context.TODO(), cs.endpoints, &v2.HeartbeatRequest{}, 10*time.Second)
+ if err == nil && hearbeat_response.Status.Code == v2.Code_OK {
+ cs.cli.log.Info("Managed to recover, executing message")
+ cs._execute_server_telemetry_command(response)
+ } else {
+ cs.cli.log.Errorf("Failed to recover, Some of the servers are unhealthy, Heartbeat err=%w", err)
+ cs.release()
+ }
+ cs.recovering = false
}
+ cs._execute_server_telemetry_command(response)
}
}()
}
@@ -199,6 +244,30 @@ var NewClient = func(config *Config, opts ...ClientOption) (Client, error) {
return cli, nil
}
+var NewClientConcrete = func(config *Config, opts ...ClientOption) (*defaultClient, error) {
+ endpoints, err := utils.ParseTarget(config.Endpoint)
+ if err != nil {
+ return nil, err
+ }
+ cli := &defaultClient{
+ config: config,
+ opts: defaultNSOptions,
+ clientID: utils.GenClientID(),
+ accessPoint: endpoints,
+ messageInterceptors: make([]MessageInterceptor, 0),
+ endpointsTelemetryClientTable: make(map[string]*defaultClientSession),
+ on: *atomic.NewBool(true),
+ clientManager: &MockClientManager{},
+ }
+ cli.log = sugarBaseLogger.With("client_id", cli.clientID)
+ for _, opt := range opts {
+ opt.apply(&cli.opts)
+ }
+ cli.done = make(chan struct{}, 1)
+ cli.clientMeterProvider = NewDefaultClientMeterProvider(cli)
+ return cli, nil
+}
+
func (cli *defaultClient) GetClientID() string {
return cli.clientID
}
diff --git a/golang/client_manager.go b/golang/client_manager.go
index 44e71a14..e3e38664 100644
--- a/golang/client_manager.go
+++ b/golang/client_manager.go
@@ -137,7 +137,6 @@ func (cm *defaultClientManager) deleteRpcClient(rpcClient RpcClient) {
}
func (cm *defaultClientManager) clearIdleRpcClients() {
- sugarBaseLogger.Info("clientManager start clearIdleRpcClients")
cm.rpcClientTableLock.Lock()
defer cm.rpcClientTableLock.Unlock()
for target, rpcClient := range cm.rpcClientTable {
diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go
index 6d794c81..d6158288 100644
--- a/golang/client_manager_test.go
+++ b/golang/client_manager_test.go
@@ -38,7 +38,9 @@ var MOCK_CLIENT *MockClient
var MOCK_RPC_CLIENT *MockRpcClient
type MOCK_MessagingService_TelemetryClient struct {
- trace []string
+ trace []string
+ recv_error_count int `default:"0"`
+ cli *defaultClient `default:"nil"`
}
// CloseSend implements v2.MessagingService_TelemetryClient
@@ -80,7 +82,18 @@ func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
// Recv implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
mt.trace = append(mt.trace, "recv")
- return nil, io.EOF
+ if mt.recv_error_count >= 1 {
+ mt.recv_error_count -= 1
+ return nil, io.EOF
+ } else {
+ if mt.cli == nil {
+ return nil, io.EOF
+ } else {
+ time.Sleep(time.Second)
+ command := mt.cli.getSettingsCommand()
+ return command, nil
+ }
+ }
}
// Send implements v2.MessagingService_TelemetryClient
diff --git a/golang/client_test.go b/golang/client_test.go
index 5265a763..41079ccd 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -18,15 +18,88 @@
package golang
import (
+ "context"
"fmt"
"testing"
"time"
"github.com/apache/rocketmq-clients/golang/credentials"
+ v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
gomock "github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "go.uber.org/zap"
+ "go.uber.org/zap/zaptest/observer"
)
+func BuildCLient(t *testing.T) *defaultClient {
+ stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
+ RPC_CLIENT_MAX_IDLE_DURATION: time.Second,
+
+ RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Hour,
+ RPC_CLIENT_IDLE_CHECK_PERIOD: time.Hour,
+
+ HEART_BEAT_INITIAL_DELAY: time.Hour,
+ HEART_BEAT_PERIOD: time.Hour,
+
+ LOG_STATS_INITIAL_DELAY: time.Hour,
+ LOG_STATS_PERIOD: time.Hour,
+
+ SYNC_SETTINGS_DELAY: time.Hour,
+ SYNC_SETTINGS_PERIOD: time.Hour,
+ })
+
+ stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
+ if target == fakeAddress {
+ return MOCK_RPC_CLIENT, nil
+ }
+ return nil, fmt.Errorf("invalid target=%s", target)
+ })
+
+ defer func() {
+ stubs.Reset()
+ stubs2.Reset()
+ }()
+
+ MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
+ trace: make([]string, 0),
+ }, nil)
+
+ endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
+ cli, err := NewClientConcrete(&Config{
+ Endpoint: endpoints,
+ Credentials: &credentials.SessionCredentials{},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ sugarBaseLogger.Info(cli)
+ err = cli.startUp()
+ if err != nil {
+ t.Error(err)
+ }
+
+ return cli
+}
+
+func GetClientAndDefaultClientSession(t *testing.T) (*defaultClient, *defaultClientSession) {
+ cli := BuildCLient(t)
+ default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+ if err != nil {
+ t.Error(err)
+ }
+ return cli, default_cli_session
+}
+
+func PrepareTestLogger(cli *defaultClient) *observer.ObservedLogs {
+ observedZapCore, observedLogs := observer.New(zap.InfoLevel)
+ observedLogger := zap.New(observedZapCore)
+ cli.log = observedLogger.Sugar()
+
+ return observedLogs
+}
+
func TestCLINewClient(t *testing.T) {
stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
RPC_CLIENT_MAX_IDLE_DURATION: time.Second,
@@ -74,3 +147,145 @@ func TestCLINewClient(t *testing.T) {
t.Error(err)
}
}
+
+func Test_acquire_observer_uninitialized(t *testing.T) {
+ // given
+ _, default_cli_session := GetClientAndDefaultClientSession(t)
+
+ // when
+ observer, acquired_observer := default_cli_session._acquire_observer()
+
+ // then
+ if acquired_observer {
+ t.Error("Acquired observer even though it is uninitialized")
+ }
+ if observer != nil {
+ t.Error("Observer should be nil")
+ }
+}
+
+func Test_acquire_observer_initialized(t *testing.T) {
+ // given
+ _, default_cli_session := GetClientAndDefaultClientSession(t)
+ default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+
+ // when
+ observer, acquired_observer := default_cli_session._acquire_observer()
+
+ // then
+ if !acquired_observer {
+ t.Error("Failed to acquire observer even though it is uninitialized")
+ }
+ if observer == nil {
+ t.Error("Observer should be not nil")
+ }
+}
+
+func Test_execute_server_telemetry_command_fail(t *testing.T) {
+ // given
+ cli, default_cli_session := GetClientAndDefaultClientSession(t)
+ default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+ observedLogs := PrepareTestLogger(cli)
+
+ // when
+ default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{})
+
+ // then
+ require.Equal(t, 1, observedLogs.Len())
+ commandExecutionLog := observedLogs.All()[0]
+ assert.Equal(t, "telemetryCommand recv err=%!w(*errors.errorString=&{handleTelemetryCommand err = Command is nil})", commandExecutionLog.Message)
+}
+
+func Test_execute_server_telemetry_command(t *testing.T) {
+ // given
+ cli, default_cli_session := GetClientAndDefaultClientSession(t)
+ default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+ observedLogs := PrepareTestLogger(cli)
+
+ // when
+ default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{Command: &v2.TelemetryCommand_RecoverOrphanedTransactionCommand{}})
+
+ // then
+ require.Equal(t, 2, observedLogs.Len())
+ commandExecutionLog := observedLogs.All()[1]
+ assert.Equal(t, "Executed command successfully", commandExecutionLog.Message)
+}
+
+func TestRestoreDefaultClientSessionZeroErrors(t *testing.T) {
+ // given
+ cli := BuildCLient(t)
+ default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+ if err != nil {
+ t.Error(err)
+ }
+ default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+ observedLogs := PrepareTestLogger(cli)
+ default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
+ recv_error_count: 0,
+ cli: cli,
+ }
+ default_cli_session.recoveryWaitTime = time.Second
+ cli.settings = &simpleConsumerSettings{}
+
+ // when
+ // we wait some time while consumer goroutine runs
+ time.Sleep(3 * time.Second)
+
+ // then
+ commandExecutionLog := observedLogs.All()[:2]
+ assert.Equal(t, "Executed command successfully", commandExecutionLog[0].Message)
+ assert.Equal(t, "Executed command successfully", commandExecutionLog[1].Message)
+}
+
+func TestRestoreDefaultClientSessionOneError(t *testing.T) {
+ // given
+ cli := BuildCLient(t)
+ default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+ if err != nil {
+ t.Error(err)
+ }
+ default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+ observedLogs := PrepareTestLogger(cli)
+ default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
+ recv_error_count: 1,
+ cli: cli,
+ }
+ default_cli_session.recoveryWaitTime = time.Second
+ cli.settings = &simpleConsumerSettings{}
+
+ // when
+ // we wait some time while consumer goroutine runs
+ time.Sleep(3 * time.Second)
+
+ // then
+ commandExecutionLog := observedLogs.All()[:3]
+ assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
+ assert.Equal(t, "Managed to recover, executing message", commandExecutionLog[1].Message)
+ assert.Equal(t, "Executed command successfully", commandExecutionLog[2].Message)
+}
+
+func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) {
+ // given
+ cli := BuildCLient(t)
+ default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+ if err != nil {
+ t.Error(err)
+ }
+ default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+ observedLogs := PrepareTestLogger(cli)
+ default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
+ recv_error_count: 2,
+ cli: cli,
+ }
+ default_cli_session.recoveryWaitTime = time.Second
+ cli.settings = &simpleConsumerSettings{}
+
+ // when
+ // we wait some time while consumer goroutine runs
+ time.Sleep(3 * time.Second)
+
+ // then
+ commandExecutionLog := observedLogs.All()[:2]
+ assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
+ assert.Equal(t, "Failed to recover, err=%wEOF", commandExecutionLog[1].Message)
+}
diff --git a/golang/rpc_client_mock.go b/golang/rpc_client_mock.go
index 0cd2372c..fb2ddf8a 100644
--- a/golang/rpc_client_mock.go
+++ b/golang/rpc_client_mock.go
@@ -215,9 +215,7 @@ func (mr *MockRpcClientMockRecorder) Telemetry(ctx interface{}) *gomock.Call {
// idleDuration mocks base method.
func (m *MockRpcClient) idleDuration() time.Duration {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "idleDuration")
- ret0, _ := ret[0].(time.Duration)
- return ret0
+ return time.Hour
}
// idleDuration indicates an expected call of idleDuration.