You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/28 06:58:45 UTC
[rocketmq-clients] branch master updated: Fix: increase wait time in recovery tests to avoid flaking (#411)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new f434134f Fix: increase wait time in recovery tests to avoid flaking (#411)
f434134f is described below
commit f434134f78fcf4ec36642326452f7ae6b06d6d35
Author: Paweł Biegun <69...@users.noreply.github.com>
AuthorDate: Tue Mar 28 08:58:38 2023 +0200
Fix: increase wait time in recovery tests to avoid flaking (#411)
1. Fix flaky tests on Windows
2. Apply the changes pointed out by static check
---
.github/workflows/golang_build.yml | 7 +++++--
.gitignore | 4 +++-
golang/client.go | 6 ------
golang/client_manager_test.go | 1 +
golang/client_test.go | 26 +++++++++++++++-----------
golang/rpc_client.go | 2 +-
golang/simple_consumer.go | 6 +++---
7 files changed, 28 insertions(+), 24 deletions(-)
diff --git a/.github/workflows/golang_build.yml b/.github/workflows/golang_build.yml
index edb71968..0417803b 100644
--- a/.github/workflows/golang_build.yml
+++ b/.github/workflows/golang_build.yml
@@ -1,5 +1,5 @@
name: Golang Build
-on:
+on:
workflow_call:
jobs:
build:
@@ -18,4 +18,7 @@ jobs:
go-version: ${{ matrix.go }}
- name: Build
working-directory: ./golang
- run: go build && go test ./...
+ run: go build
+ - name: Test
+ working-directory: ./golang
+ run: go test -v
diff --git a/.gitignore b/.gitignore
index 4959de17..1f25160a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,4 +43,6 @@ composer.lock
vendor/
# Go
-*.tests
+golang/*.tests
+golang/*.test
+golang/*.exe
diff --git a/golang/client.go b/golang/client.go
index 48104cc0..ce4f9b6a 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -150,16 +150,12 @@ func (cs *defaultClientSession) handleTelemetryCommand(response *v2.TelemetryCom
switch c := command.(type) {
case *v2.TelemetryCommand_Settings:
cs.cli.onSettingsCommand(cs.endpoints, c.Settings)
- break
case *v2.TelemetryCommand_RecoverOrphanedTransactionCommand:
cs.cli.onRecoverOrphanedTransactionCommand(cs.endpoints, c.RecoverOrphanedTransactionCommand)
- break
case *v2.TelemetryCommand_VerifyMessageCommand:
cs.cli.onVerifyMessageCommand(cs.endpoints, c.VerifyMessageCommand)
- break
case *v2.TelemetryCommand_PrintThreadStackTraceCommand:
cs.cli.onPrintThreadStackTraceCommand(cs.endpoints, c.PrintThreadStackTraceCommand)
- break
default:
return fmt.Errorf("receive unrecognized command from remote, endpoints=%v, command=%v, clientId=%s", cs.endpoints, command, cs.cli.clientID)
}
@@ -512,13 +508,11 @@ func (cli *defaultClient) startUp() error {
if err == nil {
impl.publishingRouteDataResultCache.Store(topic, plb)
}
- break
case *defaultSimpleConsumer:
slb, err := NewSubscriptionLoadBalancer(newRoute)
if err == nil {
impl.subTopicRouteDataResultCache.Store(topic, slb)
}
- break
}
}
return true
diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go
index d34f9564..67bdda81 100644
--- a/golang/client_manager_test.go
+++ b/golang/client_manager_test.go
@@ -82,6 +82,7 @@ func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
// Recv implements v2.MessagingService_TelemetryClient
func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, error) {
mt.trace = append(mt.trace, "recv")
+ sugarBaseLogger.Info("calling recv function", "state", mt.recv_error_count, "cli", mt.cli)
if mt.recv_error_count >= 1 {
mt.recv_error_count -= 1
return nil, io.EOF
diff --git a/golang/client_test.go b/golang/client_test.go
index 69c14c74..2ed17343 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -28,7 +28,6 @@ import (
gomock "github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
"github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)
@@ -74,7 +73,6 @@ func BuildCLient(t *testing.T) *defaultClient {
if err != nil {
t.Error(err)
}
- sugarBaseLogger.Info(cli)
err = cli.startUp()
if err != nil {
t.Error(err)
@@ -191,9 +189,12 @@ func Test_execute_server_telemetry_command_fail(t *testing.T) {
default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{})
// then
- require.Equal(t, 1, observedLogs.Len())
- commandExecutionLog := observedLogs.All()[0]
- assert.Equal(t, "telemetryCommand recv err=%!w(*errors.errorString=&{handleTelemetryCommand err = Command is nil})", commandExecutionLog.Message)
+ logs := observedLogs.All()
+ messages := make([]string, len(logs))
+ for index, log := range logs {
+ messages[index] = log.Message
+ }
+ assert.Contains(t, messages, "telemetryCommand recv err=%!w(*errors.errorString=&{handleTelemetryCommand err = Command is nil})")
}
func Test_execute_server_telemetry_command(t *testing.T) {
@@ -206,9 +207,12 @@ func Test_execute_server_telemetry_command(t *testing.T) {
default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{Command: &v2.TelemetryCommand_RecoverOrphanedTransactionCommand{}})
// then
- require.Equal(t, 2, observedLogs.Len())
- commandExecutionLog := observedLogs.All()[1]
- assert.Equal(t, "Executed command successfully", commandExecutionLog.Message)
+ logs := observedLogs.All()
+ messages := make([]string, len(logs))
+ for index, log := range logs {
+ messages[index] = log.Message
+ }
+ assert.Contains(t, messages, "Executed command successfully")
}
func TestRestoreDefaultClientSessionZeroErrors(t *testing.T) {
@@ -228,10 +232,10 @@ func TestRestoreDefaultClientSessionZeroErrors(t *testing.T) {
cli.settings = &simpleConsumerSettings{}
// when
- // we wait some time while consumer goroutine runs
time.Sleep(3 * time.Second)
// then
+ sugarBaseLogger.Info(observedLogs.All())
commandExecutionLog := observedLogs.All()[:2]
assert.Equal(t, "Executed command successfully", commandExecutionLog[0].Message)
assert.Equal(t, "Executed command successfully", commandExecutionLog[1].Message)
@@ -254,10 +258,10 @@ func TestRestoreDefaultClientSessionOneError(t *testing.T) {
cli.settings = &simpleConsumerSettings{}
// when
- // we wait some time while consumer goroutine runs
time.Sleep(3 * time.Second)
// then
+ sugarBaseLogger.Info(observedLogs.All())
commandExecutionLog := observedLogs.All()[:3]
assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
assert.Equal(t, "Managed to recover, executing message", commandExecutionLog[1].Message)
@@ -281,10 +285,10 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) {
cli.settings = &simpleConsumerSettings{}
// when
- // we wait some time while consumer goroutine runs
time.Sleep(3 * time.Second)
// then
+ sugarBaseLogger.Info(observedLogs.All())
commandExecutionLog := observedLogs.All()[:2]
assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
assert.Equal(t, "Failed to recover, err=%wEOF", commandExecutionLog[1].Message)
diff --git a/golang/rpc_client.go b/golang/rpc_client.go
index 98e6863d..6a593b37 100644
--- a/golang/rpc_client.go
+++ b/golang/rpc_client.go
@@ -84,7 +84,7 @@ func (rc *rpcClient) GetTarget() string {
}
func (rc *rpcClient) idleDuration() time.Duration {
- return time.Now().Sub(rc.activityNanoTime)
+ return time.Since(rc.activityNanoTime)
}
func (rc *rpcClient) Close() {}
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index c6918f55..dfc72320 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -269,7 +269,7 @@ func (sc *defaultSimpleConsumer) Receive(ctx context.Context, maxMessageNum int3
}
sc.subscriptionExpressionsLock.RLock()
topics := make([]string, 0, len(sc.subscriptionExpressions))
- for k, _ := range sc.subscriptionExpressions {
+ for k := range sc.subscriptionExpressions {
topics = append(topics, k)
}
sc.subscriptionExpressionsLock.RUnlock()
@@ -305,7 +305,7 @@ func (sc *defaultSimpleConsumer) isClient() {
}
func (sc *defaultSimpleConsumer) onRecoverOrphanedTransactionCommand(endpoints *v2.Endpoints, command *v2.RecoverOrphanedTransactionCommand) error {
- return fmt.Errorf("Ignore orphaned transaction recovery command from remote, which is not expected, client id=%s, command=%v", sc.cli.clientID, command)
+ return fmt.Errorf("ignore orphaned transaction recovery command from remote, which is not expected, client id=%s, command=%v", sc.cli.clientID, command)
}
func (sc *defaultSimpleConsumer) onVerifyMessageCommand(endpoints *v2.Endpoints, command *v2.VerifyMessageCommand) error {
@@ -344,7 +344,7 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp
sc.subscriptionExpressions = make(map[string]*FilterExpression)
}
sc.cli.initTopics = make([]string, 0)
- for topic, _ := range scOpts.subscriptionExpressions {
+ for topic := range scOpts.subscriptionExpressions {
sc.cli.initTopics = append(sc.cli.initTopics, topic)
}
endpoints, err := utils.ParseTarget(config.Endpoint)