You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/31 14:33:10 UTC
[plc4x] branch develop updated: test(plc4go/cbus): cleanup resources
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 474d17901b test(plc4go/cbus): cleanup resources
474d17901b is described below
commit 474d17901b04f164d9300cc4687ad642085b6bfa
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed May 31 16:33:02 2023 +0200
test(plc4go/cbus): cleanup resources
---
plc4go/internal/cbus/Connection_test.go | 370 +++++++++++++++++++++++---------
plc4go/internal/cbus/Discoverer.go | 6 +
plc4go/internal/cbus/Subscriber_test.go | 34 ++-
plc4go/spi/default/DefaultConnection.go | 11 +-
4 files changed, 313 insertions(+), 108 deletions(-)
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 341a2a275c..80d0e04a71 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/test"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"net/url"
@@ -185,7 +186,7 @@ func TestConnection_ConnectWithContext(t *testing.T) {
// Build the default connection
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+ codec := NewMessageCodec(func() transports.TransportInstance {
transport := test.NewTransport(loggerOption)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
if err != nil {
@@ -194,6 +195,10 @@ func TestConnection_ConnectWithContext(t *testing.T) {
}
return ti
}(), loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
wantAsserter: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
assert.NotNil(t, results)
@@ -829,13 +834,22 @@ func TestConnection_fireConnectionError(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields, args *args)
chanValidator func(*testing.T, chan<- plc4go.PlcConnectionConnectResult) bool
}{
{
name: "instant connect",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: NewMessageCodec(func() transports.TransportInstance {
+ setup: func(t *testing.T, fields *fields, args *args) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+ codec := NewMessageCodec(func() transports.TransportInstance {
transport := test.NewTransport()
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
if err != nil {
@@ -843,7 +857,11 @@ func TestConnection_fireConnectionError(t *testing.T) {
t.FailNow()
}
return ti
- }()),
+ }(), loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
chanValidator: func(_ *testing.T, _ chan<- plc4go.PlcConnectionConnectResult) bool {
return true
@@ -852,8 +870,21 @@ func TestConnection_fireConnectionError(t *testing.T) {
{
name: "notified connect",
fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: NewMessageCodec(func() transports.TransportInstance {
+ driverContext: DriverContext{
+ awaitSetupComplete: true,
+ },
+ },
+ setup: func(t *testing.T, fields *fields, args *args) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+ codec := NewMessageCodec(func() transports.TransportInstance {
transport := test.NewTransport()
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
if err != nil {
@@ -861,10 +892,11 @@ func TestConnection_fireConnectionError(t *testing.T) {
t.FailNow()
}
return ti
- }()),
- driverContext: DriverContext{
- awaitSetupComplete: true,
- },
+ }(), loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
args: args{ch: make(chan<- plc4go.PlcConnectionConnectResult, 1)},
chanValidator: func(t *testing.T, results chan<- plc4go.PlcConnectionConnectResult) bool {
@@ -875,6 +907,9 @@ func TestConnection_fireConnectionError(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
+ }
c := &Connection{
DefaultConnection: tt.fields.DefaultConnection,
messageCodec: tt.fields.messageCodec,
@@ -946,7 +981,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
loggerOption := options.WithCustomLogger(logger)
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+ codec := NewMessageCodec(func() transports.TransportInstance {
transport := test.NewTransport(loggerOption)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
if err != nil {
@@ -955,6 +990,10 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
}
return ti
}())
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
want: false,
},
@@ -1003,22 +1042,11 @@ func TestConnection_sendReset(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields, args *args)
wantOk bool
}{
{
name: "send reset",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }()),
- },
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1032,11 +1060,38 @@ func TestConnection_sendReset(t *testing.T) {
}(),
sendOutErrorNotification: false,
},
+ setup: func(t *testing.T, fields *fields, args *args) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+ codec := NewMessageCodec(func() transports.TransportInstance {
+ transport := test.NewTransport()
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ return ti
+ }(), loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
+ },
wantOk: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
+ }
c := &Connection{
DefaultConnection: tt.fields.DefaultConnection,
messageCodec: tt.fields.messageCodec,
@@ -1104,7 +1159,7 @@ func TestConnection_setApplicationFilter(t *testing.T) {
// Setup connection
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
- fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+ codec := NewMessageCodec(func() transports.TransportInstance {
transport := test.NewTransport(loggerOption)
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
if err != nil {
@@ -1113,6 +1168,10 @@ func TestConnection_setApplicationFilter(t *testing.T) {
}
return ti
}(), loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
wantOk: false,
},
@@ -1165,18 +1224,6 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
}{
{
name: "set interface 1 PUN options (failing)",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }()),
- },
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1190,7 +1237,30 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
}(),
},
setup: func(t *testing.T, fields *fields, args *args) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Setup connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+ codec := NewMessageCodec(func() transports.TransportInstance {
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ return ti
+ }(), loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
wantOk: false,
},
@@ -1243,18 +1313,6 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
}{
{
name: "set interface 1 options (failing)",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }()),
- },
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1268,7 +1326,30 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
}(),
},
setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Setup connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+ codec := NewMessageCodec(func() transports.TransportInstance {
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ return ti
+ }(), loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
want: false,
},
@@ -1321,18 +1402,6 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
}{
{
name: "set interface 3 options (failing)",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }()),
- },
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1346,7 +1415,30 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
}(),
},
setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Setup connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+ codec := NewMessageCodec(func() transports.TransportInstance {
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ return ti
+ }(), loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
wantOk: false,
},
@@ -1410,19 +1502,21 @@ func TestConnection_setupConnection(t *testing.T) {
// Custom option for that
loggerOption := options.WithCustomLogger(logger)
- // Build the default connection
+ // Setup connection
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-
- // Build the message codec
- fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
+ codec := NewMessageCodec(func() transports.TransportInstance {
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
if err != nil {
t.Error(err)
t.FailNow()
}
return ti
}(), loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
},
{
@@ -1475,6 +1569,9 @@ func TestConnection_setupConnection(t *testing.T) {
t.Error(err)
t.FailNow()
}
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
fields.messageCodec = codec
},
@@ -1541,6 +1638,9 @@ func TestConnection_setupConnection(t *testing.T) {
t.Error(err)
t.FailNow()
}
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
fields.messageCodec = codec
},
@@ -1613,6 +1713,9 @@ func TestConnection_setupConnection(t *testing.T) {
t.Error(err)
t.FailNow()
}
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
fields.messageCodec = codec
},
@@ -1691,6 +1794,10 @@ func TestConnection_setupConnection(t *testing.T) {
t.Error(err)
t.FailNow()
}
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+
fields.messageCodec = codec
},
},
@@ -1755,12 +1862,15 @@ func TestConnection_setupConnection(t *testing.T) {
}
})
codec := NewMessageCodec(transportInstance)
- err = codec.Connect()
- if err != nil {
+ if err = codec.Connect(); err != nil {
t.Error(err)
t.FailNow()
return nil
}
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+
return codec
}(),
},
@@ -1782,7 +1892,7 @@ func TestConnection_setupConnection(t *testing.T) {
fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
// Build the message codec
- fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+ codec := NewMessageCodec(func() transports.TransportInstance {
transport := test.NewTransport()
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
if err != nil {
@@ -1791,6 +1901,10 @@ func TestConnection_setupConnection(t *testing.T) {
}
return ti
}(), loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
},
}
@@ -1834,47 +1948,69 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
}{
{
name: "just start",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
},
},
{
name: "just start and feed (no subs)",
- fields: fields{
- DefaultConnection: func() _default.DefaultConnection {
- defaultConnection := _default.NewDefaultConnection(nil)
- defaultConnection.SetConnected(true)
- return defaultConnection
- }(),
- messageCodec: func() *MessageCodec {
- messageCodec := NewMessageCodec(nil)
- go func() {
- messageCodec.monitoredMMIs <- nil
- messageCodec.monitoredSALs <- nil
- }()
- return messageCodec
- }(),
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ defaultConnection := _default.NewDefaultConnection(nil, loggerOption)
+ defaultConnection.SetConnected(true)
+ fields.DefaultConnection = defaultConnection
+
+ codec := NewMessageCodec(nil, loggerOption)
+ go func() {
+ codec.monitoredMMIs <- nil
+ codec.monitoredSALs <- nil
+ }()
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
},
{
name: "just start and feed",
- fields: fields{
- DefaultConnection: func() _default.DefaultConnection {
- defaultConnection := _default.NewDefaultConnection(nil)
- defaultConnection.SetConnected(true)
- return defaultConnection
- }(),
- messageCodec: func() *MessageCodec {
- messageCodec := NewMessageCodec(nil)
- go func() {
- messageCodec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil, nil, nil)
- messageCodec.monitoredSALs <- readWriteModel.NewMonitoredSAL(0, nil)
- }()
- return messageCodec
- }(),
- },
setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ defaultConnection := _default.NewDefaultConnection(nil, loggerOption)
+ defaultConnection.SetConnected(true)
+ fields.DefaultConnection = defaultConnection
+
fields.subscribers = []*Subscriber{NewSubscriber(nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))}
+ codec := NewMessageCodec(nil, loggerOption)
+ go func() {
+ codec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil, nil, nil)
+ codec.monitoredSALs <- readWriteModel.NewMonitoredSAL(0, nil)
+ }()
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ fields.messageCodec = codec
},
},
}
@@ -1909,10 +2045,25 @@ func TestNewConnection(t *testing.T) {
tests := []struct {
name string
args args
+ setup func(t *testing.T, args *args)
wantAssert func(*testing.T, *Connection) bool
}{
{
name: "just create the connection",
+ setup: func(t *testing.T, args *args) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ codec := NewMessageCodec(nil, loggerOption)
+ t.Cleanup(func() {
+ assert.NoError(t, codec.Disconnect())
+ })
+ args.messageCodec = codec
+ },
wantAssert: func(t *testing.T, connection *Connection) bool {
return assert.NotNil(t, connection)
},
@@ -1920,7 +2071,22 @@ func TestNewConnection(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- assert.True(t, tt.wantAssert(t, NewConnection(tt.args.messageCodec, tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options)), "NewConnection(%v, %v, %v, %v, %v, %v)", tt.args.messageCodec, tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options)
+ if tt.setup != nil {
+ tt.setup(t, &tt.args)
+ }
+ connection := NewConnection(tt.args.messageCodec, tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options)
+ t.Cleanup(func() {
+ timer := time.NewTimer(1 * time.Second)
+ t.Cleanup(func() {
+ utils.CleanupTimer(timer)
+ })
+ select {
+ case <-connection.Close():
+ case <-timer.C:
+ t.Error("timeout")
+ }
+ })
+ assert.True(t, tt.wantAssert(t, connection), "NewConnection(%v, %v, %v, %v, %v, %v)", tt.args.messageCodec, tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options)
})
}
}
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 2032b0bae5..c6495aaa7f 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -213,6 +213,12 @@ func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.Transp
transportInstanceLogger.Debug().Err(err).Msg("Error connecting")
return
}
+ defer func() {
+ // Disconnect codec when done
+ if err := codec.Disconnect(); err != nil {
+ d.log.Warn().Err(err).Msg("Error disconnecting codec")
+ }
+ }()
// Prepare the discovery packet data
cBusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, true)
diff --git a/plc4go/internal/cbus/Subscriber_test.go b/plc4go/internal/cbus/Subscriber_test.go
index 6ffc4f7425..d5d794c950 100644
--- a/plc4go/internal/cbus/Subscriber_test.go
+++ b/plc4go/internal/cbus/Subscriber_test.go
@@ -21,6 +21,9 @@ package cbus
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"testing"
"time"
@@ -67,17 +70,39 @@ func TestSubscriber_Subscribe(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields, args *args)
wantAsserter func(t *testing.T, results <-chan apiModel.PlcSubscriptionRequestResult) bool
}{
{
name: "just subscribe",
- fields: fields{
- connection: NewConnection(nil, Configuration{}, DriverContext{}, nil, nil, nil),
- },
args: args{
in0: context.Background(),
subscriptionRequest: spiModel.NewDefaultPlcSubscriptionRequest(nil, []string{"blub"}, map[string]apiModel.PlcTag{"blub": NewMMIMonitorTag(readWriteModel.NewUnitAddress(1), nil, 1)}, nil, nil, nil),
},
+ setup: func(t *testing.T, fields *fields, args *args) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ codec := NewMessageCodec(nil, loggerOption)
+ connection := NewConnection(codec, Configuration{}, DriverContext{}, nil, nil, nil, loggerOption)
+ t.Cleanup(func() {
+ timer := time.NewTimer(1 * time.Second)
+ t.Cleanup(func() {
+ utils.CleanupTimer(timer)
+ })
+ select {
+ case <-connection.Close():
+ case <-timer.C:
+ t.Error("timeout")
+ }
+ })
+ fields.connection = connection
+ },
wantAsserter: func(t *testing.T, results <-chan apiModel.PlcSubscriptionRequestResult) bool {
timer := time.NewTimer(2 * time.Second)
defer timer.Stop()
@@ -94,6 +119,9 @@ func TestSubscriber_Subscribe(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
+ }
m := &Subscriber{
connection: tt.fields.connection,
consumers: tt.fields.consumers,
diff --git a/plc4go/spi/default/DefaultConnection.go b/plc4go/spi/default/DefaultConnection.go
index 492ae8be56..17f8715daa 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -271,10 +271,15 @@ func (d *defaultConnection) BlockingClose() {
func (d *defaultConnection) Close() <-chan plc4go.PlcConnectionCloseResult {
d.log.Trace().Msg("close connection")
- if err := d.GetMessageCodec().Disconnect(); err != nil {
- d.log.Warn().Err(err).Msg("Error disconnecting message code")
+ if messageCodec := d.GetMessageCodec(); messageCodec != nil {
+ if err := messageCodec.Disconnect(); err != nil {
+ d.log.Warn().Err(err).Msg("Error disconnecting message code")
+ }
+ }
+ var err error
+ if transportInstance := d.GetTransportInstance(); transportInstance != nil {
+ err = transportInstance.Close()
}
- err := d.GetTransportInstance().Close()
d.SetConnected(false)
ch := make(chan plc4go.PlcConnectionCloseResult, 1)
ch <- NewDefaultPlcConnectionCloseResult(d.GetConnection(), err)