You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/02 08:19:10 UTC
[plc4x] branch develop updated: test(plc4go/cbus): fix concurrency issue in test.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 80a0d8ef1b test(plc4go/cbus): fix concurrency issue in test.
80a0d8ef1b is described below
commit 80a0d8ef1b6e83b10fac637d87f87aec91ebe088
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 10:18:52 2023 +0200
test(plc4go/cbus): fix concurrency issue in test.
Transition away from using WaitGroup on addResponseCode or addPlcValue and hook a chan close into transaction end with EndTransaction or FailTransaction as this is really the marker which notes if the transaction is ended (what we looking for at the end). This has two benefits: 1. No more flaky tests and 2. if we forget to properly end a transaction we will notice and need to fix that in prod code.
---
plc4go/internal/cbus/Reader.go | 5 +-
plc4go/internal/cbus/Reader_test.go | 146 +++++++++++++++++-------------------
2 files changed, 73 insertions(+), 78 deletions(-)
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 0b3cedc4ff..2382a20e1b 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/rs/zerolog"
+ "github.com/rs/zerolog/log"
"sync"
"time"
@@ -200,7 +201,9 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
// TODO: check if we can use a plcValueSerializer
encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
- return errors.Wrap(err, "error encoding reply")
+ log.Error().Err(err).Msg("error encoding reply")
+ addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+ return transaction.EndRequest()
}
return transaction.EndRequest()
}, func(err error) error {
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index bb93973550..b5baf829e0 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -29,12 +29,12 @@ import (
"github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/transports/test"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"net/url"
"strings"
- "sync"
"sync/atomic"
"testing"
"time"
@@ -405,16 +405,15 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
ctx context.Context
transaction transactions.RequestTransaction
messageToSend readWriteModel.CBusMessage
- addResponseCode func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode)
+ addResponseCode func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode)
tagName string
- addPlcValue func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue)
+ addPlcValue func(t *testing.T) func(name string, plcValue apiValues.PlcValue)
}
tests := []struct {
name string
fields fields
args args
- setup func(t *testing.T, fields *fields, args *args)
- wg *sync.WaitGroup
+ setup func(t *testing.T, fields *fields, args *args, ch chan struct{})
}{
{
name: "Send message empty message",
@@ -428,23 +427,21 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
return timeout
}(),
messageToSend: nil,
- addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t, apiModel.PlcResponseCode_INTERNAL_ERROR, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
return func(name string, plcValue apiValues.PlcValue) {
t.Logf("Got response %s for %s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
loggerOption := options.WithCustomLogger(testutils.ProduceTestingLogger(t))
@@ -471,10 +468,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.FailRequest(mock.Anything).Return(errors.New("no I say"))
+ expect.FailRequest(mock.Anything).Return(errors.New("no I say")).Run(func(_ error) {
+ close(ch)
+ })
args.transaction = transaction
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with message to client",
@@ -543,31 +541,30 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t, apiModel.PlcResponseCode_REQUEST_TIMEOUT, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
return func(name string, plcValue apiValues.PlcValue) {
t.Logf("Got response %s for %s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.FailRequest(mock.Anything).Return(errors.New("Nope"))
+ expect.FailRequest(mock.Anything).Return(errors.New("Nope")).Run(func(_ error) {
+ close(ch)
+ })
args.transaction = transaction
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with server error",
@@ -636,31 +633,30 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
return func(name string, plcValue apiValues.PlcValue) {
t.Logf("Got response %s for %s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with too many retransmissions",
@@ -692,26 +688,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t, apiModel.PlcResponseCode_REMOTE_ERROR, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
return func(name string, plcValue apiValues.PlcValue) {
t.Logf("Got response %s for %s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -759,7 +755,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with corruption",
@@ -791,26 +786,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
return func(name string, plcValue apiValues.PlcValue) {
t.Logf("Got response %s for %s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -858,7 +853,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with sync loss",
@@ -890,26 +884,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t, apiModel.PlcResponseCode_REMOTE_BUSY, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
return func(name string, plcValue apiValues.PlcValue) {
t.Logf("Got response %s for %s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -957,7 +951,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with too long",
@@ -989,26 +982,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
return func(name string, plcValue apiValues.PlcValue) {
t.Logf("Got response %s for %s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -1056,7 +1049,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with confirm only",
@@ -1088,26 +1080,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t, apiModel.PlcResponseCode_NOT_FOUND, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
return func(name string, plcValue apiValues.PlcValue) {
t.Logf("Got response %s for %s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -1155,7 +1147,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with ok",
@@ -1187,26 +1178,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t, apiModel.PlcResponseCode_OK, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
return func(name string, plcValue apiValues.PlcValue) {
t.Logf("Got response %s for %s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -1254,28 +1245,29 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: func() *sync.WaitGroup {
- wg := &sync.WaitGroup{}
- wg.Add(1) // We getting an response and a value
- return wg
- }(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ ch := make(chan struct{})
if tt.setup != nil {
- tt.setup(t, &tt.fields, &tt.args)
+ tt.setup(t, &tt.fields, &tt.args, ch)
}
m := &Reader{
alphaGenerator: tt.fields.alphaGenerator,
messageCodec: tt.fields.messageCodec,
tm: tt.fields.tm,
}
- tt.wg.Add(1)
- m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t, tt.wg), tt.args.tagName, tt.args.addPlcValue(t, tt.wg))
+ m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t), tt.args.tagName, tt.args.addPlcValue(t))
t.Log("Waiting now")
- tt.wg.Wait() // TODO: we need to timeout this too
- t.Log("Done waiting")
+ timer := time.NewTimer(3 * time.Second)
+ defer utils.CleanupTimer(timer)
+ select {
+ case <-ch:
+ t.Log("Done waiting")
+ case <-timer.C:
+ t.Error("Timeout")
+ }
})
}
}