You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/20 05:43:55 UTC

[rocketmq-clients] branch master updated: try to recover on server TelemetryCommand transmission error (#408)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 550f9699 try to recover on server TelemetryCommand transmission error (#408)
550f9699 is described below

commit 550f9699f17f340fb7684b41bae45e1b57c324de
Author: Paweł Biegun <69...@users.noreply.github.com>
AuthorDate: Mon Mar 20 06:43:47 2023 +0100

    try to recover on server TelemetryCommand transmission error (#408)
    
    * try to recover on server TelemetryCommand transmission error
    
    * refactor modyfying the recovery state
    
    * write tests for auxilary functions
    
    * remove binary files
    
    * add .test files to gitignore
    
    * refactor tests
    
    * write the rest of  tests
    
    * move Go .gitignore to global .gitignore
---
 .gitignore                    |   3 +
 golang/client.go              | 109 +++++++++++++++++----
 golang/client_manager.go      |   1 -
 golang/client_manager_test.go |  17 +++-
 golang/client_test.go         | 215 ++++++++++++++++++++++++++++++++++++++++++
 golang/rpc_client_mock.go     |   4 +-
 6 files changed, 323 insertions(+), 26 deletions(-)

diff --git a/.gitignore b/.gitignore
index 924f09fb..4959de17 100644
--- a/.gitignore
+++ b/.gitignore
@@ -41,3 +41,6 @@ rust/src/pb/*.rs
 composer.phar
 composer.lock
 vendor/
+
+# Go
+*.tests
diff --git a/golang/client.go b/golang/client.go
index 2b95f2bc..313092ee 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -51,11 +51,13 @@ type isClient interface {
 	onVerifyMessageCommand(endpoints *v2.Endpoints, command *v2.VerifyMessageCommand) error
 }
 type defaultClientSession struct {
-	endpoints    *v2.Endpoints
-	observer     v2.MessagingService_TelemetryClient
-	observerLock sync.RWMutex
-	cli          *defaultClient
-	timeout      time.Duration
+	endpoints        *v2.Endpoints
+	observer         v2.MessagingService_TelemetryClient
+	observerLock     sync.RWMutex
+	cli              *defaultClient
+	timeout          time.Duration
+	recovering       bool
+	recoveryWaitTime time.Duration `default:"5s"`
 }
 
 func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientSession, error) {
@@ -64,36 +66,79 @@ func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientS
 		return nil, err
 	}
 	cs := &defaultClientSession{
-		endpoints: endpoints,
-		cli:       cli,
-		timeout:   365 * 24 * time.Hour,
+		endpoints:  endpoints,
+		cli:        cli,
+		timeout:    365 * 24 * time.Hour,
+		recovering: false,
 	}
 	cs.startUp()
 	return cs, nil
 }
+
+func (cs *defaultClientSession) _acquire_observer() (v2.MessagingService_TelemetryClient, bool) {
+	cs.observerLock.RLock()
+	observer := cs.observer
+	cs.observerLock.RUnlock()
+
+	if observer == nil {
+		time.Sleep(time.Second)
+		return nil, false
+	} else {
+		return observer, true
+	}
+
+}
+
+func (cs *defaultClientSession) _execute_server_telemetry_command(command *v2.TelemetryCommand) {
+	err := cs.handleTelemetryCommand(command)
+	if err != nil {
+		cs.cli.log.Errorf("telemetryCommand recv err=%w", err)
+	} else {
+		cs.cli.log.Info("Executed command successfully")
+	}
+}
+
 func (cs *defaultClientSession) startUp() {
 	cs.cli.log.Infof("defaultClientSession is startUp! endpoints=%v", cs.endpoints)
 	go func() {
 		for {
-			cs.observerLock.RLock()
-			observer := cs.observer
-			cs.observerLock.RUnlock()
-
-			if observer == nil {
-				time.Sleep(time.Second)
+			// ensure that observer is present, if not wait for it to be regenerated on publish.
+			observer, acquired_observer := cs._acquire_observer()
+			if !acquired_observer {
 				continue
 			}
+
 			response, err := observer.Recv()
 			if err != nil {
-				cs.release()
-
-				cs.cli.log.Errorf("telemetryCommand recv err=%w", err)
+				// we are recovering
+				if !cs.recovering {
+					cs.cli.log.Info("Encountered error while receiving TelemetryCommand, trying to recover")
+					// we wait five seconds to give time for the transmission error to be resolved externally before we attempt to read the message again.
+					time.Sleep(cs.recoveryWaitTime)
+					cs.recovering = true
+				} else {
+					// we are recovering but we failed to read the message again, resetting observer
+					cs.cli.log.Info("Failed to recover, err=%w", err)
+					cs.release()
+					cs.recovering = false
+				}
 				continue
 			}
-			err = cs.handleTelemetryCommand(response)
-			if err != nil {
-				cs.cli.log.Errorf("telemetryCommand recv err=%w", err)
+			// at this point we received the message and must confirm that the sender is healthy
+			if cs.recovering {
+				// we don't know which server sent the request so we must check that each of the servers is healthy.
+				// we assume that the list of the servers hasn't changed, so the server that sent the message is still present.
+				hearbeat_response, err := cs.cli.clientManager.HeartBeat(context.TODO(), cs.endpoints, &v2.HeartbeatRequest{}, 10*time.Second)
+				if err == nil && hearbeat_response.Status.Code == v2.Code_OK {
+					cs.cli.log.Info("Managed to recover, executing message")
+					cs._execute_server_telemetry_command(response)
+				} else {
+					cs.cli.log.Errorf("Failed to recover, Some of the servers are unhealthy, Heartbeat err=%w", err)
+					cs.release()
+				}
+				cs.recovering = false
 			}
+			cs._execute_server_telemetry_command(response)
 		}
 	}()
 }
@@ -199,6 +244,30 @@ var NewClient = func(config *Config, opts ...ClientOption) (Client, error) {
 	return cli, nil
 }
 
+var NewClientConcrete = func(config *Config, opts ...ClientOption) (*defaultClient, error) {
+	endpoints, err := utils.ParseTarget(config.Endpoint)
+	if err != nil {
+		return nil, err
+	}
+	cli := &defaultClient{
+		config:                        config,
+		opts:                          defaultNSOptions,
+		clientID:                      utils.GenClientID(),
+		accessPoint:                   endpoints,
+		messageInterceptors:           make([]MessageInterceptor, 0),
+		endpointsTelemetryClientTable: make(map[string]*defaultClientSession),
+		on:                            *atomic.NewBool(true),
+		clientManager:                 &MockClientManager{},
+	}
+	cli.log = sugarBaseLogger.With("client_id", cli.clientID)
+	for _, opt := range opts {
+		opt.apply(&cli.opts)
+	}
+	cli.done = make(chan struct{}, 1)
+	cli.clientMeterProvider = NewDefaultClientMeterProvider(cli)
+	return cli, nil
+}
+
 func (cli *defaultClient) GetClientID() string {
 	return cli.clientID
 }
diff --git a/golang/client_manager.go b/golang/client_manager.go
index 44e71a14..e3e38664 100644
--- a/golang/client_manager.go
+++ b/golang/client_manager.go
@@ -137,7 +137,6 @@ func (cm *defaultClientManager) deleteRpcClient(rpcClient RpcClient) {
 }
 
 func (cm *defaultClientManager) clearIdleRpcClients() {
-	sugarBaseLogger.Info("clientManager start clearIdleRpcClients")
 	cm.rpcClientTableLock.Lock()
 	defer cm.rpcClientTableLock.Unlock()
 	for target, rpcClient := range cm.rpcClientTable {
diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go
index 6d794c81..d6158288 100644
--- a/golang/client_manager_test.go
+++ b/golang/client_manager_test.go
@@ -38,7 +38,9 @@ var MOCK_CLIENT *MockClient
 var MOCK_RPC_CLIENT *MockRpcClient
 
 type MOCK_MessagingService_TelemetryClient struct {
-	trace []string
+	trace            []string
+	recv_error_count int            `default:"0"`
+	cli              *defaultClient `default:"nil"`
 }
 
 // CloseSend implements v2.MessagingService_TelemetryClient
@@ -80,7 +82,18 @@ func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
 // Recv implements v2.MessagingService_TelemetryClient
 func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
 	mt.trace = append(mt.trace, "recv")
-	return nil, io.EOF
+	if mt.recv_error_count >= 1 {
+		mt.recv_error_count -= 1
+		return nil, io.EOF
+	} else {
+		if mt.cli == nil {
+			return nil, io.EOF
+		} else {
+			time.Sleep(time.Second)
+			command := mt.cli.getSettingsCommand()
+			return command, nil
+		}
+	}
 }
 
 // Send implements v2.MessagingService_TelemetryClient
diff --git a/golang/client_test.go b/golang/client_test.go
index 5265a763..41079ccd 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -18,15 +18,88 @@
 package golang
 
 import (
+	"context"
 	"fmt"
 	"testing"
 	"time"
 
 	"github.com/apache/rocketmq-clients/golang/credentials"
+	v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
 	gomock "github.com/golang/mock/gomock"
 	"github.com/prashantv/gostub"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"go.uber.org/zap"
+	"go.uber.org/zap/zaptest/observer"
 )
 
+func BuildCLient(t *testing.T) *defaultClient {
+	stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
+		RPC_CLIENT_MAX_IDLE_DURATION: time.Second,
+
+		RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Hour,
+		RPC_CLIENT_IDLE_CHECK_PERIOD:        time.Hour,
+
+		HEART_BEAT_INITIAL_DELAY: time.Hour,
+		HEART_BEAT_PERIOD:        time.Hour,
+
+		LOG_STATS_INITIAL_DELAY: time.Hour,
+		LOG_STATS_PERIOD:        time.Hour,
+
+		SYNC_SETTINGS_DELAY:  time.Hour,
+		SYNC_SETTINGS_PERIOD: time.Hour,
+	})
+
+	stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts ...RpcClientOption) (RpcClient, error) {
+		if target == fakeAddress {
+			return MOCK_RPC_CLIENT, nil
+		}
+		return nil, fmt.Errorf("invalid target=%s", target)
+	})
+
+	defer func() {
+		stubs.Reset()
+		stubs2.Reset()
+	}()
+
+	MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
+		trace: make([]string, 0),
+	}, nil)
+
+	endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
+	cli, err := NewClientConcrete(&Config{
+		Endpoint:    endpoints,
+		Credentials: &credentials.SessionCredentials{},
+	})
+	if err != nil {
+		t.Error(err)
+	}
+	sugarBaseLogger.Info(cli)
+	err = cli.startUp()
+	if err != nil {
+		t.Error(err)
+	}
+
+	return cli
+}
+
+func GetClientAndDefaultClientSession(t *testing.T) (*defaultClient, *defaultClientSession) {
+	cli := BuildCLient(t)
+	default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+	if err != nil {
+		t.Error(err)
+	}
+	return cli, default_cli_session
+}
+
+func PrepareTestLogger(cli *defaultClient) *observer.ObservedLogs {
+	observedZapCore, observedLogs := observer.New(zap.InfoLevel)
+	observedLogger := zap.New(observedZapCore)
+	cli.log = observedLogger.Sugar()
+
+	return observedLogs
+}
+
 func TestCLINewClient(t *testing.T) {
 	stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
 		RPC_CLIENT_MAX_IDLE_DURATION: time.Second,
@@ -74,3 +147,145 @@ func TestCLINewClient(t *testing.T) {
 		t.Error(err)
 	}
 }
+
+func Test_acquire_observer_uninitialized(t *testing.T) {
+	// given
+	_, default_cli_session := GetClientAndDefaultClientSession(t)
+
+	// when
+	observer, acquired_observer := default_cli_session._acquire_observer()
+
+	// then
+	if acquired_observer {
+		t.Error("Acquired observer even though it is uninitialized")
+	}
+	if observer != nil {
+		t.Error("Observer should be nil")
+	}
+}
+
+func Test_acquire_observer_initialized(t *testing.T) {
+	// given
+	_, default_cli_session := GetClientAndDefaultClientSession(t)
+	default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+
+	// when
+	observer, acquired_observer := default_cli_session._acquire_observer()
+
+	// then
+	if !acquired_observer {
+		t.Error("Failed to acquire observer even though it is uninitialized")
+	}
+	if observer == nil {
+		t.Error("Observer should be not nil")
+	}
+}
+
+func Test_execute_server_telemetry_command_fail(t *testing.T) {
+	// given
+	cli, default_cli_session := GetClientAndDefaultClientSession(t)
+	default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+	observedLogs := PrepareTestLogger(cli)
+
+	// when
+	default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{})
+
+	// then
+	require.Equal(t, 1, observedLogs.Len())
+	commandExecutionLog := observedLogs.All()[0]
+	assert.Equal(t, "telemetryCommand recv err=%!w(*errors.errorString=&{handleTelemetryCommand err = Command is nil})", commandExecutionLog.Message)
+}
+
+func Test_execute_server_telemetry_command(t *testing.T) {
+	// given
+	cli, default_cli_session := GetClientAndDefaultClientSession(t)
+	default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+	observedLogs := PrepareTestLogger(cli)
+
+	// when
+	default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{Command: &v2.TelemetryCommand_RecoverOrphanedTransactionCommand{}})
+
+	// then
+	require.Equal(t, 2, observedLogs.Len())
+	commandExecutionLog := observedLogs.All()[1]
+	assert.Equal(t, "Executed command successfully", commandExecutionLog.Message)
+}
+
+func TestRestoreDefaultClientSessionZeroErrors(t *testing.T) {
+	// given
+	cli := BuildCLient(t)
+	default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+	if err != nil {
+		t.Error(err)
+	}
+	default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+	observedLogs := PrepareTestLogger(cli)
+	default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
+		recv_error_count: 0,
+		cli:              cli,
+	}
+	default_cli_session.recoveryWaitTime = time.Second
+	cli.settings = &simpleConsumerSettings{}
+
+	// when
+	// we wait some time while consumer goroutine runs
+	time.Sleep(3 * time.Second)
+
+	// then
+	commandExecutionLog := observedLogs.All()[:2]
+	assert.Equal(t, "Executed command successfully", commandExecutionLog[0].Message)
+	assert.Equal(t, "Executed command successfully", commandExecutionLog[1].Message)
+}
+
+func TestRestoreDefaultClientSessionOneError(t *testing.T) {
+	// given
+	cli := BuildCLient(t)
+	default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+	if err != nil {
+		t.Error(err)
+	}
+	default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+	observedLogs := PrepareTestLogger(cli)
+	default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
+		recv_error_count: 1,
+		cli:              cli,
+	}
+	default_cli_session.recoveryWaitTime = time.Second
+	cli.settings = &simpleConsumerSettings{}
+
+	// when
+	// we wait some time while consumer goroutine runs
+	time.Sleep(3 * time.Second)
+
+	// then
+	commandExecutionLog := observedLogs.All()[:3]
+	assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
+	assert.Equal(t, "Managed to recover, executing message", commandExecutionLog[1].Message)
+	assert.Equal(t, "Executed command successfully", commandExecutionLog[2].Message)
+}
+
+func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) {
+	// given
+	cli := BuildCLient(t)
+	default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+	if err != nil {
+		t.Error(err)
+	}
+	default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+	observedLogs := PrepareTestLogger(cli)
+	default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
+		recv_error_count: 2,
+		cli:              cli,
+	}
+	default_cli_session.recoveryWaitTime = time.Second
+	cli.settings = &simpleConsumerSettings{}
+
+	// when
+	// we wait some time while consumer goroutine runs
+	time.Sleep(3 * time.Second)
+
+	// then
+	commandExecutionLog := observedLogs.All()[:2]
+	assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
+	assert.Equal(t, "Failed to recover, err=%wEOF", commandExecutionLog[1].Message)
+}
diff --git a/golang/rpc_client_mock.go b/golang/rpc_client_mock.go
index 0cd2372c..fb2ddf8a 100644
--- a/golang/rpc_client_mock.go
+++ b/golang/rpc_client_mock.go
@@ -215,9 +215,7 @@ func (mr *MockRpcClientMockRecorder) Telemetry(ctx interface{}) *gomock.Call {
 // idleDuration mocks base method.
 func (m *MockRpcClient) idleDuration() time.Duration {
 	m.ctrl.T.Helper()
-	ret := m.ctrl.Call(m, "idleDuration")
-	ret0, _ := ret[0].(time.Duration)
-	return ret0
+	return time.Hour
 }
 
 // idleDuration indicates an expected call of idleDuration.