You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 12:10:05 UTC
[plc4x] branch develop updated: test(plc4go/cbus): fix concurrency issue with connection test
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 06c79b35c9 test(plc4go/cbus): fix concurrency issue with connection test
06c79b35c9 is described below
commit 06c79b35c9fde10e04b7d89fac7f76fcebbbee04
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 14:09:57 2023 +0200
test(plc4go/cbus): fix concurrency issue with connection test
---
plc4go/internal/cbus/Connection.go | 99 +++++++++---------
plc4go/internal/cbus/Connection_test.go | 178 ++++++++++++++++++++++++--------
plc4go/internal/cbus/DriverContext.go | 5 +-
3 files changed, 192 insertions(+), 90 deletions(-)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 154a2b299a..5b4f767815 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -310,64 +310,71 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
receivedResetEchoChan := make(chan bool, 1)
receivedResetEchoErrorChan := make(chan error, 1)
- if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
- switch message := message.(type) {
- case readWriteModel.CBusMessageToClientExactly:
- switch reply := message.GetReply().(type) {
- case readWriteModel.ReplyOrConfirmationReplyExactly:
- switch reply.GetReply().(type) {
- case readWriteModel.PowerUpReplyExactly:
- c.log.Debug().Msg("Received a PUN reply")
- return true
+ if err := c.messageCodec.SendRequest(
+ ctx,
+ cBusMessage,
+ func(message spi.Message) bool {
+ c.log.Trace().Msg("Checking message")
+ switch message := message.(type) {
+ case readWriteModel.CBusMessageToClientExactly:
+ switch reply := message.GetReply().(type) {
+ case readWriteModel.ReplyOrConfirmationReplyExactly:
+ switch reply.GetReply().(type) {
+ case readWriteModel.PowerUpReplyExactly:
+ c.log.Debug().Msg("Received a PUN reply")
+ return true
+ default:
+ c.log.Trace().Msgf("%T not relevant", reply)
+ return false
+ }
default:
c.log.Trace().Msgf("%T not relevant", reply)
return false
}
+ case readWriteModel.CBusMessageToServerExactly:
+ switch request := message.GetRequest().(type) {
+ case readWriteModel.RequestResetExactly:
+ c.log.Debug().Msg("Received a Reset reply")
+ return true
+ default:
+ c.log.Trace().Msgf("%T not relevant", request)
+ return false
+ }
default:
- c.log.Trace().Msgf("%T not relevant", reply)
- return false
- }
- case readWriteModel.CBusMessageToServerExactly:
- switch request := message.GetRequest().(type) {
- case readWriteModel.RequestResetExactly:
- c.log.Debug().Msg("Received a Reset reply")
- return true
- default:
- c.log.Trace().Msgf("%T not relevant", request)
+ c.log.Trace().Msgf("%T not relevant", message)
return false
}
- default:
- c.log.Trace().Msgf("%T not relevant", message)
- return false
- }
- }, func(message spi.Message) error {
- switch message.(type) {
- case readWriteModel.CBusMessageToClientExactly:
- // This is the powerup notification
- select {
- case receivedResetEchoChan <- false:
- c.log.Trace().Msg("notified reset chan from message to client")
+ },
+ func(message spi.Message) error {
+ c.log.Trace().Msg("Handling message")
+ switch message.(type) {
+ case readWriteModel.CBusMessageToClientExactly:
+ // This is the powerup notification
+ select {
+ case receivedResetEchoChan <- false:
+ c.log.Trace().Msg("notified reset chan from message to client")
+ default:
+ }
+ case readWriteModel.CBusMessageToServerExactly:
+ // This is the echo
+ select {
+ case receivedResetEchoChan <- true:
+ c.log.Trace().Msg("notified reset chan from message to server")
+ default:
+ }
default:
+ return errors.Errorf("Unmapped type %T", message)
}
- case readWriteModel.CBusMessageToServerExactly:
- // This is the echo
+ return nil
+ },
+ func(err error) error {
select {
- case receivedResetEchoChan <- true:
- c.log.Trace().Msg("notified reset chan from message to server")
+ case receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request"):
+ c.log.Trace().Msg("notified error chan")
default:
}
- default:
- return errors.Errorf("Unmapped type %T", message)
- }
- return nil
- }, func(err error) error {
- select {
- case receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request"):
- c.log.Trace().Msg("notified error chan")
- default:
- }
- return nil
- }, c.GetTtl()); err != nil {
+ return nil
+ }, c.GetTtl()); err != nil {
if sendOutErrorNotification {
c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
} else {
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 366d0b22b3..073904751b 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -21,6 +21,7 @@ package cbus
import (
"context"
+ "encoding/hex"
"github.com/stretchr/testify/require"
"net/url"
"sync"
@@ -1379,27 +1380,35 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
func TestConnection_setupConnection(t *testing.T) {
type fields struct {
- DefaultConnection _default.DefaultConnection
- messageCodec *MessageCodec
- subscribers []*Subscriber
- tm transactions.RequestTransactionManager
- configuration Configuration
- driverContext DriverContext
- connectionId string
- tracer tracer.Tracer
+ _DefaultConnection _default.DefaultConnection
+ messageCodec *MessageCodec
+ subscribers []*Subscriber
+ tm transactions.RequestTransactionManager
+ configuration Configuration
+ driverContext DriverContext
+ connectionId string
+ tracer tracer.Tracer
}
type args struct {
ctx context.Context
ch chan plc4go.PlcConnectionConnectResult
}
tests := []struct {
- name string
- fields fields
- args args
- setup func(t *testing.T, fields *fields)
+ name string
+ fields fields
+ args args
+ setup func(t *testing.T, fields *fields)
+ manipulator func(t *testing.T, connection *Connection)
+ validator func(t *testing.T, result plc4go.PlcConnectionConnectResult)
}{
{
name: "setup connection (failing)",
+ fields: fields{
+ driverContext: DriverContext{
+ awaitSetupComplete: true,
+ awaitDisconnectComplete: true,
+ },
+ },
args: args{
ctx: testutils.TestContext(t),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1407,20 +1416,33 @@ func TestConnection_setupConnection(t *testing.T) {
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
- // Setup connection
- fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
require.NoError(t, err)
codec := NewMessageCodec(ti, _options...)
+ require.NoError(t, codec.Connect())
t.Cleanup(func() {
assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
},
+ manipulator: func(t *testing.T, connection *Connection) {
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+ },
+ validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+ assert.NotNil(t, result)
+ assert.Error(t, result.GetErr())
+ },
},
{
name: "setup connection (failing after reset)",
+ fields: fields{
+ driverContext: DriverContext{
+ awaitSetupComplete: true,
+ awaitDisconnectComplete: true,
+ },
+ },
args: args{
ctx: testutils.TestContext(t),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1428,9 +1450,6 @@ func TestConnection_setupConnection(t *testing.T) {
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
- // Build the default connection
- fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
-
// Build the message codec
transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
@@ -1464,19 +1483,29 @@ func TestConnection_setupConnection(t *testing.T) {
fields.messageCodec = codec
},
+ manipulator: func(t *testing.T, connection *Connection) {
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+ },
+ validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+ assert.NotNil(t, result)
+ assert.Error(t, result.GetErr())
+ },
},
{
name: "setup connection (failing after app filters)",
+ fields: fields{
+ driverContext: DriverContext{
+ awaitSetupComplete: true,
+ awaitDisconnectComplete: true,
+ },
+ },
args: args{
ctx: testutils.TestContext(t),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
-
- // Build the default connection
- fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
-
// Build the message codec
transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
@@ -1522,9 +1551,23 @@ func TestConnection_setupConnection(t *testing.T) {
fields.messageCodec = codec
},
+ manipulator: func(t *testing.T, connection *Connection) {
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+ },
+ validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+ assert.NotNil(t, result)
+ assert.Error(t, result.GetErr())
+ },
},
{
name: "setup connection (failing after interface options 3",
+ fields: fields{
+ driverContext: DriverContext{
+ awaitSetupComplete: true,
+ awaitDisconnectComplete: true,
+ },
+ },
args: args{
ctx: testutils.TestContext(t),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1532,9 +1575,6 @@ func TestConnection_setupConnection(t *testing.T) {
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
- // Build the default connection
- fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
-
// Build the message codec
transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
@@ -1552,6 +1592,7 @@ func TestConnection_setupConnection(t *testing.T) {
currentState.Store(RESET)
stateChangeMutex := sync.Mutex{}
ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("Reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -1586,9 +1627,23 @@ func TestConnection_setupConnection(t *testing.T) {
fields.messageCodec = codec
},
+ manipulator: func(t *testing.T, connection *Connection) {
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+ },
+ validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+ assert.NotNil(t, result)
+ assert.Error(t, result.GetErr())
+ },
},
{
name: "setup connection (failing after interface options 1 pun)",
+ fields: fields{
+ driverContext: DriverContext{
+ awaitSetupComplete: true,
+ awaitDisconnectComplete: true,
+ },
+ },
args: args{
ctx: testutils.TestContext(t),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1596,9 +1651,6 @@ func TestConnection_setupConnection(t *testing.T) {
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
- // Build the default connection
- fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
-
// Build the message codec
transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
@@ -1656,11 +1708,22 @@ func TestConnection_setupConnection(t *testing.T) {
fields.messageCodec = codec
},
+ manipulator: func(t *testing.T, connection *Connection) {
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+ },
+ validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+ assert.NotNil(t, result)
+ assert.Error(t, result.GetErr())
+ },
},
{
name: "setup connection",
fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
+ driverContext: DriverContext{
+ awaitSetupComplete: true,
+ awaitDisconnectComplete: true,
+ },
},
args: args{
ctx: testutils.TestContext(t),
@@ -1669,9 +1732,6 @@ func TestConnection_setupConnection(t *testing.T) {
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
- // Build the default connection
- fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
-
// Build the message codec
transport := test.NewTransport(_options...)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
@@ -1730,10 +1790,19 @@ func TestConnection_setupConnection(t *testing.T) {
codec := NewMessageCodec(ti, _options...)
require.NoError(t, codec.Connect())
t.Cleanup(func() {
- assert.NoError(t, codec.Disconnect())
+ assert.Error(t, codec.Disconnect())
})
fields.messageCodec = codec
},
+ manipulator: func(t *testing.T, connection *Connection) {
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+ },
+ validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+ assert.NotNil(t, result)
+ assert.NoError(t, result.GetErr())
+ assert.NotNil(t, result.GetConnection())
+ },
},
}
for _, tt := range tests {
@@ -1742,20 +1811,43 @@ func TestConnection_setupConnection(t *testing.T) {
tt.setup(t, &tt.fields)
}
c := &Connection{
- DefaultConnection: tt.fields.DefaultConnection,
- messageCodec: tt.fields.messageCodec,
- subscribers: tt.fields.subscribers,
- tm: tt.fields.tm,
- configuration: tt.fields.configuration,
- driverContext: tt.fields.driverContext,
- connectionId: tt.fields.connectionId,
- tracer: tt.fields.tracer,
- log: testutils.ProduceTestingLogger(t),
+ messageCodec: tt.fields.messageCodec,
+ subscribers: tt.fields.subscribers,
+ tm: tt.fields.tm,
+ configuration: tt.fields.configuration,
+ driverContext: tt.fields.driverContext,
+ connectionId: tt.fields.connectionId,
+ tracer: tt.fields.tracer,
+ log: testutils.ProduceTestingLogger(t),
+ }
+ if tt.manipulator != nil {
+ tt.manipulator(t, c)
}
c.setupConnection(tt.args.ctx, tt.args.ch)
+ assert.NotNil(t, tt.args.ch, "We always need a result channel")
+ chanTimeout := time.NewTimer(10 * time.Second)
+ t.Cleanup(func() {
+ utils.CleanupTimer(chanTimeout)
+ })
+ select {
+ case <-chanTimeout.C:
+ t.Fatal("setup connection doesn't fill chan in time")
+ case result := <-tt.args.ch:
+ if tt.validator != nil {
+ tt.validator(t, result)
+ }
+ }
// To shut down properly we always do that
- c.SetConnected(false)
- c.handlerWaitGroup.Wait()
+ closeTimeout := time.NewTimer(10 * time.Second)
+ t.Cleanup(func() {
+ utils.CleanupTimer(closeTimeout)
+ })
+ select {
+ case <-closeTimeout.C:
+ t.Fatal("close didn't react in time")
+ case <-c.Close():
+ t.Log("connection closed")
+ }
})
}
}
diff --git a/plc4go/internal/cbus/DriverContext.go b/plc4go/internal/cbus/DriverContext.go
index a53f2fa8b5..041f4214e2 100644
--- a/plc4go/internal/cbus/DriverContext.go
+++ b/plc4go/internal/cbus/DriverContext.go
@@ -26,5 +26,8 @@ type DriverContext struct {
}
func NewDriverContext(_ Configuration) DriverContext {
- return DriverContext{}
+ return DriverContext{
+ awaitSetupComplete: true,
+ awaitDisconnectComplete: true,
+ }
}