You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/28 06:58:45 UTC

[rocketmq-clients] branch master updated: Fix: increase wait time in recovery tests to avoid flaking (#411)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new f434134f Fix: increase wait time in recovery tests to avoid flaking (#411)
f434134f is described below

commit f434134f78fcf4ec36642326452f7ae6b06d6d35
Author: Paweł Biegun <69...@users.noreply.github.com>
AuthorDate: Tue Mar 28 08:58:38 2023 +0200

    Fix: increase wait time in recovery tests to avoid flaking (#411)
    
    1. Fix flaky tests on Windows
    2. Apply the changes pointed out by static check
---
 .github/workflows/golang_build.yml |  7 +++++--
 .gitignore                         |  4 +++-
 golang/client.go                   |  6 ------
 golang/client_manager_test.go      |  1 +
 golang/client_test.go              | 26 +++++++++++++++-----------
 golang/rpc_client.go               |  2 +-
 golang/simple_consumer.go          |  6 +++---
 7 files changed, 28 insertions(+), 24 deletions(-)

diff --git a/.github/workflows/golang_build.yml b/.github/workflows/golang_build.yml
index edb71968..0417803b 100644
--- a/.github/workflows/golang_build.yml
+++ b/.github/workflows/golang_build.yml
@@ -1,5 +1,5 @@
 name: Golang Build
-on: 
+on:
   workflow_call:
 jobs:
   build:
@@ -18,4 +18,7 @@ jobs:
           go-version: ${{ matrix.go }}
       - name: Build
         working-directory: ./golang
-        run: go build && go test ./...
+        run: go build
+      - name: Test
+        working-directory: ./golang
+        run: go test -v
diff --git a/.gitignore b/.gitignore
index 4959de17..1f25160a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,4 +43,6 @@ composer.lock
 vendor/
 
 # Go
-*.tests
+golang/*.tests
+golang/*.test
+golang/*.exe
diff --git a/golang/client.go b/golang/client.go
index 48104cc0..ce4f9b6a 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -150,16 +150,12 @@ func (cs *defaultClientSession) handleTelemetryCommand(response *v2.TelemetryCom
 	switch c := command.(type) {
 	case *v2.TelemetryCommand_Settings:
 		cs.cli.onSettingsCommand(cs.endpoints, c.Settings)
-		break
 	case *v2.TelemetryCommand_RecoverOrphanedTransactionCommand:
 		cs.cli.onRecoverOrphanedTransactionCommand(cs.endpoints, c.RecoverOrphanedTransactionCommand)
-		break
 	case *v2.TelemetryCommand_VerifyMessageCommand:
 		cs.cli.onVerifyMessageCommand(cs.endpoints, c.VerifyMessageCommand)
-		break
 	case *v2.TelemetryCommand_PrintThreadStackTraceCommand:
 		cs.cli.onPrintThreadStackTraceCommand(cs.endpoints, c.PrintThreadStackTraceCommand)
-		break
 	default:
 		return fmt.Errorf("receive unrecognized command from remote, endpoints=%v, command=%v, clientId=%s", cs.endpoints, command, cs.cli.clientID)
 	}
@@ -512,13 +508,11 @@ func (cli *defaultClient) startUp() error {
 					if err == nil {
 						impl.publishingRouteDataResultCache.Store(topic, plb)
 					}
-					break
 				case *defaultSimpleConsumer:
 					slb, err := NewSubscriptionLoadBalancer(newRoute)
 					if err == nil {
 						impl.subTopicRouteDataResultCache.Store(topic, slb)
 					}
-					break
 				}
 			}
 			return true
diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go
index d34f9564..67bdda81 100644
--- a/golang/client_manager_test.go
+++ b/golang/client_manager_test.go
@@ -82,6 +82,7 @@ func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
 // Recv implements v2.MessagingService_TelemetryClient
 func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
 	mt.trace = append(mt.trace, "recv")
+	sugarBaseLogger.Info("calling recv function", "state", mt.recv_error_count, "cli", mt.cli)
 	if mt.recv_error_count >= 1 {
 		mt.recv_error_count -= 1
 		return nil, io.EOF
diff --git a/golang/client_test.go b/golang/client_test.go
index 69c14c74..2ed17343 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -28,7 +28,6 @@ import (
 	gomock "github.com/golang/mock/gomock"
 	"github.com/prashantv/gostub"
 	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
 	"go.uber.org/zap"
 	"go.uber.org/zap/zaptest/observer"
 )
@@ -74,7 +73,6 @@ func BuildCLient(t *testing.T) *defaultClient {
 	if err != nil {
 		t.Error(err)
 	}
-	sugarBaseLogger.Info(cli)
 	err = cli.startUp()
 	if err != nil {
 		t.Error(err)
@@ -191,9 +189,12 @@ func Test_execute_server_telemetry_command_fail(t *testing.T) {
 	default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{})
 
 	// then
-	require.Equal(t, 1, observedLogs.Len())
-	commandExecutionLog := observedLogs.All()[0]
-	assert.Equal(t, "telemetryCommand recv err=%!w(*errors.errorString=&{handleTelemetryCommand err = Command is nil})", commandExecutionLog.Message)
+	logs := observedLogs.All()
+	messages := make([]string, len(logs))
+	for index, log := range logs {
+		messages[index] = log.Message
+	}
+	assert.Contains(t, messages, "telemetryCommand recv err=%!w(*errors.errorString=&{handleTelemetryCommand err = Command is nil})")
 }
 
 func Test_execute_server_telemetry_command(t *testing.T) {
@@ -206,9 +207,12 @@ func Test_execute_server_telemetry_command(t *testing.T) {
 	default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{Command: &v2.TelemetryCommand_RecoverOrphanedTransactionCommand{}})
 
 	// then
-	require.Equal(t, 2, observedLogs.Len())
-	commandExecutionLog := observedLogs.All()[1]
-	assert.Equal(t, "Executed command successfully", commandExecutionLog.Message)
+	logs := observedLogs.All()
+	messages := make([]string, len(logs))
+	for index, log := range logs {
+		messages[index] = log.Message
+	}
+	assert.Contains(t, messages, "Executed command successfully")
 }
 
 func TestRestoreDefaultClientSessionZeroErrors(t *testing.T) {
@@ -228,10 +232,10 @@ func TestRestoreDefaultClientSessionZeroErrors(t *testing.T) {
 	cli.settings = &simpleConsumerSettings{}
 
 	// when
-	// we wait some time while consumer goroutine runs
 	time.Sleep(3 * time.Second)
 
 	// then
+	sugarBaseLogger.Info(observedLogs.All())
 	commandExecutionLog := observedLogs.All()[:2]
 	assert.Equal(t, "Executed command successfully", commandExecutionLog[0].Message)
 	assert.Equal(t, "Executed command successfully", commandExecutionLog[1].Message)
@@ -254,10 +258,10 @@ func TestRestoreDefaultClientSessionOneError(t *testing.T) {
 	cli.settings = &simpleConsumerSettings{}
 
 	// when
-	// we wait some time while consumer goroutine runs
 	time.Sleep(3 * time.Second)
 
 	// then
+	sugarBaseLogger.Info(observedLogs.All())
 	commandExecutionLog := observedLogs.All()[:3]
 	assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
 	assert.Equal(t, "Managed to recover, executing message", commandExecutionLog[1].Message)
@@ -281,10 +285,10 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) {
 	cli.settings = &simpleConsumerSettings{}
 
 	// when
-	// we wait some time while consumer goroutine runs
 	time.Sleep(3 * time.Second)
 
 	// then
+	sugarBaseLogger.Info(observedLogs.All())
 	commandExecutionLog := observedLogs.All()[:2]
 	assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
 	assert.Equal(t, "Failed to recover, err=%wEOF", commandExecutionLog[1].Message)
diff --git a/golang/rpc_client.go b/golang/rpc_client.go
index 98e6863d..6a593b37 100644
--- a/golang/rpc_client.go
+++ b/golang/rpc_client.go
@@ -84,7 +84,7 @@ func (rc *rpcClient) GetTarget() string {
 }
 
 func (rc *rpcClient) idleDuration() time.Duration {
-	return time.Now().Sub(rc.activityNanoTime)
+	return time.Since(rc.activityNanoTime)
 }
 
 func (rc *rpcClient) Close() {}
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index c6918f55..dfc72320 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -269,7 +269,7 @@ func (sc *defaultSimpleConsumer) Receive(ctx context.Context, maxMessageNum int3
 	}
 	sc.subscriptionExpressionsLock.RLock()
 	topics := make([]string, 0, len(sc.subscriptionExpressions))
-	for k, _ := range sc.subscriptionExpressions {
+	for k := range sc.subscriptionExpressions {
 		topics = append(topics, k)
 	}
 	sc.subscriptionExpressionsLock.RUnlock()
@@ -305,7 +305,7 @@ func (sc *defaultSimpleConsumer) isClient() {
 }
 
 func (sc *defaultSimpleConsumer) onRecoverOrphanedTransactionCommand(endpoints *v2.Endpoints, command *v2.RecoverOrphanedTransactionCommand) error {
-	return fmt.Errorf("Ignore orphaned transaction recovery command from remote, which is not expected, client id=%s, command=%v", sc.cli.clientID, command)
+	return fmt.Errorf("ignore orphaned transaction recovery command from remote, which is not expected, client id=%s, command=%v", sc.cli.clientID, command)
 }
 
 func (sc *defaultSimpleConsumer) onVerifyMessageCommand(endpoints *v2.Endpoints, command *v2.VerifyMessageCommand) error {
@@ -344,7 +344,7 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp
 		sc.subscriptionExpressions = make(map[string]*FilterExpression)
 	}
 	sc.cli.initTopics = make([]string, 0)
-	for topic, _ := range scOpts.subscriptionExpressions {
+	for topic := range scOpts.subscriptionExpressions {
 		sc.cli.initTopics = append(sc.cli.initTopics, topic)
 	}
 	endpoints, err := utils.ParseTarget(config.Endpoint)