You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/31 14:33:10 UTC

[plc4x] branch develop updated: test(plc4go/cbus): cleanup resources

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 474d17901b test(plc4go/cbus): cleanup resources
474d17901b is described below

commit 474d17901b04f164d9300cc4687ad642085b6bfa
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed May 31 16:33:02 2023 +0200

    test(plc4go/cbus): cleanup resources
---
 plc4go/internal/cbus/Connection_test.go | 370 +++++++++++++++++++++++---------
 plc4go/internal/cbus/Discoverer.go      |   6 +
 plc4go/internal/cbus/Subscriber_test.go |  34 ++-
 plc4go/spi/default/DefaultConnection.go |  11 +-
 4 files changed, 313 insertions(+), 108 deletions(-)

diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 341a2a275c..80d0e04a71 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -31,6 +31,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/transactions"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 	"net/url"
@@ -185,7 +186,7 @@ func TestConnection_ConnectWithContext(t *testing.T) {
 
 				// Build the default connection
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+				codec := NewMessageCodec(func() transports.TransportInstance {
 					transport := test.NewTransport(loggerOption)
 					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
 					if err != nil {
@@ -194,6 +195,10 @@ func TestConnection_ConnectWithContext(t *testing.T) {
 					}
 					return ti
 				}(), loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 			wantAsserter: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
 				assert.NotNil(t, results)
@@ -829,13 +834,22 @@ func TestConnection_fireConnectionError(t *testing.T) {
 		name          string
 		fields        fields
 		args          args
+		setup         func(t *testing.T, fields *fields, args *args)
 		chanValidator func(*testing.T, chan<- plc4go.PlcConnectionConnectResult) bool
 	}{
 		{
 			name: "instant connect",
-			fields: fields{
-				DefaultConnection: _default.NewDefaultConnection(nil),
-				messageCodec: NewMessageCodec(func() transports.TransportInstance {
+			setup: func(t *testing.T, fields *fields, args *args) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Set the model logger to the logger above
+				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+				codec := NewMessageCodec(func() transports.TransportInstance {
 					transport := test.NewTransport()
 					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
 					if err != nil {
@@ -843,7 +857,11 @@ func TestConnection_fireConnectionError(t *testing.T) {
 						t.FailNow()
 					}
 					return ti
-				}()),
+				}(), loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 			chanValidator: func(_ *testing.T, _ chan<- plc4go.PlcConnectionConnectResult) bool {
 				return true
@@ -852,8 +870,21 @@ func TestConnection_fireConnectionError(t *testing.T) {
 		{
 			name: "notified connect",
 			fields: fields{
-				DefaultConnection: _default.NewDefaultConnection(nil),
-				messageCodec: NewMessageCodec(func() transports.TransportInstance {
+				driverContext: DriverContext{
+					awaitSetupComplete: true,
+				},
+			},
+			setup: func(t *testing.T, fields *fields, args *args) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Set the model logger to the logger above
+				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+				codec := NewMessageCodec(func() transports.TransportInstance {
 					transport := test.NewTransport()
 					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
 					if err != nil {
@@ -861,10 +892,11 @@ func TestConnection_fireConnectionError(t *testing.T) {
 						t.FailNow()
 					}
 					return ti
-				}()),
-				driverContext: DriverContext{
-					awaitSetupComplete: true,
-				},
+				}(), loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 			args: args{ch: make(chan<- plc4go.PlcConnectionConnectResult, 1)},
 			chanValidator: func(t *testing.T, results chan<- plc4go.PlcConnectionConnectResult) bool {
@@ -875,6 +907,9 @@ func TestConnection_fireConnectionError(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields, &tt.args)
+			}
 			c := &Connection{
 				DefaultConnection: tt.fields.DefaultConnection,
 				messageCodec:      tt.fields.messageCodec,
@@ -946,7 +981,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
 				loggerOption := options.WithCustomLogger(logger)
 
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+				codec := NewMessageCodec(func() transports.TransportInstance {
 					transport := test.NewTransport(loggerOption)
 					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
 					if err != nil {
@@ -955,6 +990,10 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
 					}
 					return ti
 				}())
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 			want: false,
 		},
@@ -1003,22 +1042,11 @@ func TestConnection_sendReset(t *testing.T) {
 		name   string
 		fields fields
 		args   args
+		setup  func(t *testing.T, fields *fields, args *args)
 		wantOk bool
 	}{
 		{
 			name: "send reset",
-			fields: fields{
-				DefaultConnection: _default.NewDefaultConnection(nil),
-				messageCodec: NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport()
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}()),
-			},
 			args: args{
 				ctx: context.Background(),
 				ch:  make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1032,11 +1060,38 @@ func TestConnection_sendReset(t *testing.T) {
 				}(),
 				sendOutErrorNotification: false,
 			},
+			setup: func(t *testing.T, fields *fields, args *args) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Set the model logger to the logger above
+				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+				codec := NewMessageCodec(func() transports.TransportInstance {
+					transport := test.NewTransport()
+					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+					}
+					return ti
+				}(), loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
+			},
 			wantOk: false,
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields, &tt.args)
+			}
 			c := &Connection{
 				DefaultConnection: tt.fields.DefaultConnection,
 				messageCodec:      tt.fields.messageCodec,
@@ -1104,7 +1159,7 @@ func TestConnection_setApplicationFilter(t *testing.T) {
 
 				// Setup connection
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-				fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+				codec := NewMessageCodec(func() transports.TransportInstance {
 					transport := test.NewTransport(loggerOption)
 					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
 					if err != nil {
@@ -1113,6 +1168,10 @@ func TestConnection_setApplicationFilter(t *testing.T) {
 					}
 					return ti
 				}(), loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 			wantOk: false,
 		},
@@ -1165,18 +1224,6 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
 	}{
 		{
 			name: "set interface 1 PUN options (failing)",
-			fields: fields{
-				DefaultConnection: _default.NewDefaultConnection(nil),
-				messageCodec: NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport()
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}()),
-			},
 			args: args{
 				ctx: context.Background(),
 				ch:  make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1190,7 +1237,30 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
 				}(),
 			},
 			setup: func(t *testing.T, fields *fields, args *args) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+				fields.log = logger
+
 				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				// Custom option for that
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Setup connection
+				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+				codec := NewMessageCodec(func() transports.TransportInstance {
+					transport := test.NewTransport(loggerOption)
+					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+					}
+					return ti
+				}(), loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 			wantOk: false,
 		},
@@ -1243,18 +1313,6 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
 	}{
 		{
 			name: "set interface 1 options (failing)",
-			fields: fields{
-				DefaultConnection: _default.NewDefaultConnection(nil),
-				messageCodec: NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport()
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}()),
-			},
 			args: args{
 				ctx: context.Background(),
 				ch:  make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1268,7 +1326,30 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
 				}(),
 			},
 			setup: func(t *testing.T, fields *fields) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+				fields.log = logger
+
 				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				// Custom option for that
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Setup connection
+				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+				codec := NewMessageCodec(func() transports.TransportInstance {
+					transport := test.NewTransport(loggerOption)
+					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+					}
+					return ti
+				}(), loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 			want: false,
 		},
@@ -1321,18 +1402,6 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
 	}{
 		{
 			name: "set interface 3 options (failing)",
-			fields: fields{
-				DefaultConnection: _default.NewDefaultConnection(nil),
-				messageCodec: NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport()
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-					if err != nil {
-						t.Error(err)
-						t.FailNow()
-					}
-					return ti
-				}()),
-			},
 			args: args{
 				ctx: context.Background(),
 				ch:  make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1346,7 +1415,30 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
 				}(),
 			},
 			setup: func(t *testing.T, fields *fields) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+				fields.log = logger
+
 				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				// Custom option for that
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Setup connection
+				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+				codec := NewMessageCodec(func() transports.TransportInstance {
+					transport := test.NewTransport(loggerOption)
+					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+					}
+					return ti
+				}(), loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 			wantOk: false,
 		},
@@ -1410,19 +1502,21 @@ func TestConnection_setupConnection(t *testing.T) {
 				// Custom option for that
 				loggerOption := options.WithCustomLogger(logger)
 
-				// Build the default connection
+				// Setup connection
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
-
-				// Build the message codec
-				fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
-					transport := test.NewTransport()
-					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
+				codec := NewMessageCodec(func() transports.TransportInstance {
+					transport := test.NewTransport(loggerOption)
+					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
 					if err != nil {
 						t.Error(err)
 						t.FailNow()
 					}
 					return ti
 				}(), loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 		},
 		{
@@ -1475,6 +1569,9 @@ func TestConnection_setupConnection(t *testing.T) {
 					t.Error(err)
 					t.FailNow()
 				}
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
 
 				fields.messageCodec = codec
 			},
@@ -1541,6 +1638,9 @@ func TestConnection_setupConnection(t *testing.T) {
 					t.Error(err)
 					t.FailNow()
 				}
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
 
 				fields.messageCodec = codec
 			},
@@ -1613,6 +1713,9 @@ func TestConnection_setupConnection(t *testing.T) {
 					t.Error(err)
 					t.FailNow()
 				}
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
 
 				fields.messageCodec = codec
 			},
@@ -1691,6 +1794,10 @@ func TestConnection_setupConnection(t *testing.T) {
 					t.Error(err)
 					t.FailNow()
 				}
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+
 				fields.messageCodec = codec
 			},
 		},
@@ -1755,12 +1862,15 @@ func TestConnection_setupConnection(t *testing.T) {
 						}
 					})
 					codec := NewMessageCodec(transportInstance)
-					err = codec.Connect()
-					if err != nil {
+					if err = codec.Connect(); err != nil {
 						t.Error(err)
 						t.FailNow()
 						return nil
 					}
+					t.Cleanup(func() {
+						assert.NoError(t, codec.Disconnect())
+					})
+
 					return codec
 				}(),
 			},
@@ -1782,7 +1892,7 @@ func TestConnection_setupConnection(t *testing.T) {
 				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
 
 				// Build the message codec
-				fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+				codec := NewMessageCodec(func() transports.TransportInstance {
 					transport := test.NewTransport()
 					ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
 					if err != nil {
@@ -1791,6 +1901,10 @@ func TestConnection_setupConnection(t *testing.T) {
 					}
 					return ti
 				}(), loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 		},
 	}
@@ -1834,47 +1948,69 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
 	}{
 		{
 			name: "just start",
-			fields: fields{
-				DefaultConnection: _default.NewDefaultConnection(nil),
+			setup: func(t *testing.T, fields *fields) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+				fields.log = logger
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Set the model logger to the logger above
+				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
 			},
 		},
 		{
 			name: "just start and feed (no subs)",
-			fields: fields{
-				DefaultConnection: func() _default.DefaultConnection {
-					defaultConnection := _default.NewDefaultConnection(nil)
-					defaultConnection.SetConnected(true)
-					return defaultConnection
-				}(),
-				messageCodec: func() *MessageCodec {
-					messageCodec := NewMessageCodec(nil)
-					go func() {
-						messageCodec.monitoredMMIs <- nil
-						messageCodec.monitoredSALs <- nil
-					}()
-					return messageCodec
-				}(),
+			setup: func(t *testing.T, fields *fields) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+				fields.log = logger
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Set the model logger to the logger above
+				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				defaultConnection := _default.NewDefaultConnection(nil, loggerOption)
+				defaultConnection.SetConnected(true)
+				fields.DefaultConnection = defaultConnection
+
+				codec := NewMessageCodec(nil, loggerOption)
+				go func() {
+					codec.monitoredMMIs <- nil
+					codec.monitoredSALs <- nil
+				}()
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 		},
 		{
 			name: "just start and feed",
-			fields: fields{
-				DefaultConnection: func() _default.DefaultConnection {
-					defaultConnection := _default.NewDefaultConnection(nil)
-					defaultConnection.SetConnected(true)
-					return defaultConnection
-				}(),
-				messageCodec: func() *MessageCodec {
-					messageCodec := NewMessageCodec(nil)
-					go func() {
-						messageCodec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil, nil, nil)
-						messageCodec.monitoredSALs <- readWriteModel.NewMonitoredSAL(0, nil)
-					}()
-					return messageCodec
-				}(),
-			},
 			setup: func(t *testing.T, fields *fields) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+				fields.log = logger
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Set the model logger to the logger above
+				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				defaultConnection := _default.NewDefaultConnection(nil, loggerOption)
+				defaultConnection.SetConnected(true)
+				fields.DefaultConnection = defaultConnection
+
 				fields.subscribers = []*Subscriber{NewSubscriber(nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))}
+				codec := NewMessageCodec(nil, loggerOption)
+				go func() {
+					codec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil, nil, nil)
+					codec.monitoredSALs <- readWriteModel.NewMonitoredSAL(0, nil)
+				}()
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				fields.messageCodec = codec
 			},
 		},
 	}
@@ -1909,10 +2045,25 @@ func TestNewConnection(t *testing.T) {
 	tests := []struct {
 		name       string
 		args       args
+		setup      func(t *testing.T, args *args)
 		wantAssert func(*testing.T, *Connection) bool
 	}{
 		{
 			name: "just create the connection",
+			setup: func(t *testing.T, args *args) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Set the model logger to the logger above
+				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				codec := NewMessageCodec(nil, loggerOption)
+				t.Cleanup(func() {
+					assert.NoError(t, codec.Disconnect())
+				})
+				args.messageCodec = codec
+			},
 			wantAssert: func(t *testing.T, connection *Connection) bool {
 				return assert.NotNil(t, connection)
 			},
@@ -1920,7 +2071,22 @@ func TestNewConnection(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			assert.True(t, tt.wantAssert(t, NewConnection(tt.args.messageCodec, tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options)), "NewConnection(%v, %v, %v, %v, %v, %v)", tt.args.messageCodec, tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options)
+			if tt.setup != nil {
+				tt.setup(t, &tt.args)
+			}
+			connection := NewConnection(tt.args.messageCodec, tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options)
+			t.Cleanup(func() {
+				timer := time.NewTimer(1 * time.Second)
+				t.Cleanup(func() {
+					utils.CleanupTimer(timer)
+				})
+				select {
+				case <-connection.Close():
+				case <-timer.C:
+					t.Error("timeout")
+				}
+			})
+			assert.True(t, tt.wantAssert(t, connection), "NewConnection(%v, %v, %v, %v, %v, %v)", tt.args.messageCodec, tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options)
 		})
 	}
 }
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 2032b0bae5..c6495aaa7f 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -213,6 +213,12 @@ func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.Transp
 			transportInstanceLogger.Debug().Err(err).Msg("Error connecting")
 			return
 		}
+		defer func() {
+			// Disconnect codec when done
+			if err := codec.Disconnect(); err != nil {
+				d.log.Warn().Err(err).Msg("Error disconnecting codec")
+			}
+		}()
 
 		// Prepare the discovery packet data
 		cBusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, true)
diff --git a/plc4go/internal/cbus/Subscriber_test.go b/plc4go/internal/cbus/Subscriber_test.go
index 6ffc4f7425..d5d794c950 100644
--- a/plc4go/internal/cbus/Subscriber_test.go
+++ b/plc4go/internal/cbus/Subscriber_test.go
@@ -21,6 +21,9 @@ package cbus
 
 import (
 	"context"
+	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/testutils"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"testing"
 	"time"
 
@@ -67,17 +70,39 @@ func TestSubscriber_Subscribe(t *testing.T) {
 		name         string
 		fields       fields
 		args         args
+		setup        func(t *testing.T, fields *fields, args *args)
 		wantAsserter func(t *testing.T, results <-chan apiModel.PlcSubscriptionRequestResult) bool
 	}{
 		{
 			name: "just subscribe",
-			fields: fields{
-				connection: NewConnection(nil, Configuration{}, DriverContext{}, nil, nil, nil),
-			},
 			args: args{
 				in0:                 context.Background(),
 				subscriptionRequest: spiModel.NewDefaultPlcSubscriptionRequest(nil, []string{"blub"}, map[string]apiModel.PlcTag{"blub": NewMMIMonitorTag(readWriteModel.NewUnitAddress(1), nil, 1)}, nil, nil, nil),
 			},
+			setup: func(t *testing.T, fields *fields, args *args) {
+				// Setup logger
+				logger := testutils.ProduceTestingLogger(t)
+
+				loggerOption := options.WithCustomLogger(logger)
+
+				// Set the model logger to the logger above
+				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+				codec := NewMessageCodec(nil, loggerOption)
+				connection := NewConnection(codec, Configuration{}, DriverContext{}, nil, nil, nil, loggerOption)
+				t.Cleanup(func() {
+					timer := time.NewTimer(1 * time.Second)
+					t.Cleanup(func() {
+						utils.CleanupTimer(timer)
+					})
+					select {
+					case <-connection.Close():
+					case <-timer.C:
+						t.Error("timeout")
+					}
+				})
+				fields.connection = connection
+			},
 			wantAsserter: func(t *testing.T, results <-chan apiModel.PlcSubscriptionRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
 				defer timer.Stop()
@@ -94,6 +119,9 @@ func TestSubscriber_Subscribe(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields, &tt.args)
+			}
 			m := &Subscriber{
 				connection: tt.fields.connection,
 				consumers:  tt.fields.consumers,
diff --git a/plc4go/spi/default/DefaultConnection.go b/plc4go/spi/default/DefaultConnection.go
index 492ae8be56..17f8715daa 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -271,10 +271,15 @@ func (d *defaultConnection) BlockingClose() {
 
 func (d *defaultConnection) Close() <-chan plc4go.PlcConnectionCloseResult {
 	d.log.Trace().Msg("close connection")
-	if err := d.GetMessageCodec().Disconnect(); err != nil {
-		d.log.Warn().Err(err).Msg("Error disconnecting message code")
+	if messageCodec := d.GetMessageCodec(); messageCodec != nil {
+		if err := messageCodec.Disconnect(); err != nil {
+			d.log.Warn().Err(err).Msg("Error disconnecting message code")
+		}
+	}
+	var err error
+	if transportInstance := d.GetTransportInstance(); transportInstance != nil {
+		err = transportInstance.Close()
 	}
-	err := d.GetTransportInstance().Close()
 	d.SetConnected(false)
 	ch := make(chan plc4go.PlcConnectionCloseResult, 1)
 	ch <- NewDefaultPlcConnectionCloseResult(d.GetConnection(), err)